Thursday, December 24, 2015

Spring Cloud Rest Client with Netflix Ribbon - Customizations

In an earlier blog post I had covered the basic configuration involved in making a REST call using Spring Cloud which utilizes Netflix Ribbon libraries internally to load balance calls, with basic configurations like setting read timeout, number of retries etc . Here I will go over some more customizations that can be done that will require going beyond the configuration files.

Use Case

My use case is very simple - I want to specify the URL(s) that a rest call is invoked against. This may appear straightforward to start with however there are a few catches to consider. By default if Spring Cloud sees Eureka related libraries in the classpath the behavior is to use Eureka to discover the instances of a service and loadbalance across the instances.

Approach 1 - Use a non-loadbalanced Rest Template

An approach that will work is to use an instance of RestTemplate that does not use Ribbon at all:

@Bean
public RestOperations nonLoadbalancedRestTemplate() {
    return new RestTemplate();
}

Now, where you need the Rest Template, you can inject this instance in, knowing that Spring Cloud also would have instantiated another instance that supports Eureka, so this injection will have to be done by name this way:

@Service("restTemplateDirectPongClient")
public class RestTemplateDirectPongClient implements PongClient {

    private final RestOperations restTemplate;

    @Autowired
    public RestTemplateDirectPongClient(@Qualifier("nonLoadbalancedRestTemplate") RestOperations restTemplate) {
        this.restTemplate = restTemplate;
    }

    ...
}

The big catch with the approach however is now that we have bypassed Ribbon all the features that Ribbon provides are lost - we would not have features like automatic retry, read and connect timeouts, loadbalancing in case we had multiple urls. So a better approach may be the following.

Approach 2 - Customize Ribbon based Rest Template

In the earlier blog post I had shown some basic customization of Ribbon which can be made using a configuration file:

samplepong:
  ribbon:
    DeploymentContextBasedVipAddresses: sample-pong
    ReadTimeout: 5000
    MaxAutoRetries: 2

All the customizations that you would normally do through a configuration file for Ribbon however do not carry over, in this specific instance I want to use a list of server instances that I specify instead of letting Ribbon figure out via a Eureka call. Using raw ribbon it is specified the following way:

samplepong.ribbon.NIWSServerListClassName=com.netflix.loadbalancer.ConfigurationBasedServerList
samplepong.ribbon.listOfServers=127.0.0.1:8082

This specific configuration will not work with Spring Cloud however, the way to specify a list of servers is by specifying a configuration file along these lines:

package org.bk.noscan.consumer.ribbon;

import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.ConfigurationBasedServerList;
import com.netflix.loadbalancer.Server;
import com.netflix.loadbalancer.ServerList;
import org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class PongDirectCallRibbonConfiguration extends RibbonClientConfiguration {

    @Bean
    @Override
    public ServerList<Server> ribbonServerList(IClientConfig clientConfig) {
        ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
        serverList.initWithNiwsConfig(clientConfig);
        return serverList;
    }

}

and telling Ribbon to use this configuration for the specific "named client" that we are concerned about:

@RibbonClients({
        @RibbonClient(name = "samplepongdirect", configuration = PongDirectCallRibbonConfiguration.class),
})

With this configuration in place, the list of servers can now be specified using configuration this way:

samplepongdirect:
  ribbon:
    DeploymentContextBasedVipAddresses: sample-pong
    listOfServers: localhost:8082
    ReadTimeout: 5000
    MaxAutoRetries: 2

One thing to note is that since the Ribbon Configuration is a normal Spring configuration it will likely get picked up as part of the @ComponentScan annotation, since this is very specific for Ribbon we would not want this configuration to be picked up this way. I have avoided that by specifying a package not in the normal classpath scan "org.bk.noscan.*" package!, I am not sure if there is another clean way to do this but this approach has worked well for me.

This approach is little more extensive than the first approach, however the advantage is that once this is in place all the features of Ribbon carry over.

Conclusion

This concludes the customizations involved in using Spring Cloud with Ribbon. If you are interested in exploring the code a little further I have this integrated in my github repo here.

References

Spring Cloud reference documentation has been an awesome source of information for all the details presented here. I had mistakenly opened a github issue thinking that it is a Spring Cloud issue and got some very useful information through the discussion here

Sunday, December 6, 2015

Spring Cloud Rest Client with Netflix Ribbon - Basics

In an earlier blog post I had covered the different options for a REST client in the Spring Cloud world. All the options wrap around a Netflix OSS based component called Ribbon which handles the aspects related to loadbalancing the calls across different instances hosting a service, handling failovers, timeouts etc. Here I will cover a few ways to customize the behavior of underlying Ribbon components when used with Spring Cloud and follow it up with more comprehensive customizations.

Creating a Rest Client


To recap, first consider a case where a simple service needs to be called:



A typical way to make this call using Spring is to inject in a RestTemplate and use it make this call, the following way:


public class RestTemplateBasedPongClient implements PongClient {

    @Autowired
    private RestTemplate restTemplate;

    @Override
    public MessageAcknowledgement sendMessage(Message message) {
        String pongServiceUrl = "http://serviceurl/message";
        HttpEntity<Message> requestEntity = new HttpEntity<>(message);
        ResponseEntity<MessageAcknowledgement> response =  this.restTemplate.exchange(pongServiceUrl, HttpMethod.POST, requestEntity, MessageAcknowledgement.class, Maps.newHashMap());
        return response.getBody();
    }

}

There is nothing special here. When using Spring Cloud however the same code behaves differently, now the RestTemplate internally uses Netflix OSS Ribbon libraries to make the call. This helps as the typical call flow is to first find the instances running the service and then to loadbalance the calls across the instances and to maintain this state.

Rest Client With Ribbon


Let me digress a little to touch on Ribbon, Ribbon uses an abstraction called a "Named client" to control the behavior of a remote service call - the name by which the service has registered with Eureka, timeout for service calls, how many retries in case of failures etc. These are specified through configuration files, and the entries are typically along these lines, note that the "Named client" here is "samplepong" and the properties have this as a prefix:

samplepong.ribbon.MaxAutoRetries=2
samplepong.ribbon.MaxAutoRetriesNextServer=2
samplepong.ribbon.OkToRetryOnAllOperations=true
samplepong.ribbon.ServerListRefreshInterval=2000
samplepong.ribbon.ConnectTimeout=5000
samplepong.ribbon.ReadTimeout=90000
samplepong.ribbon.EnableZoneAffinity=false
samplepong.ribbon.DeploymentContextBasedVipAddresses=sample-pong
samplepong.ribbon.NIWSServerListClassName=com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList


Coming back to Spring Cloud, it supports the concept of a "Named Client" in a very clever way through the Url hostname, so the RestTemplate call would now look like this:

ResponseEntity<MessageAcknowledgement> response =  this.restTemplate.exchange("http://samplepong/message", HttpMethod.POST, requestEntity, MessageAcknowledgement.class, Maps.newHashMap());

The "samplepong" in the url is the "Named client" and any customization for the behavior of the underlying Ribbon can be made by specifying the properties using this prefix. Since this is a Spring Cloud applications the properties can be specified cleanly in a yaml format along these lines:

samplepong:
  ribbon:
    DeploymentContextBasedVipAddresses: sample-pong
    ReadTimeout: 5000
    MaxAutoRetries: 2


Conclusion

This covers the basics of how Spring Cloud abstracts out the underlying the Ribbon libraries to provide a very intuitive facade to make remote service calls in the Cloud environment. There are some details that I have skimmed over on some of the customizations, I will cover these in a newer post. Here is my github repo with the code that I have used for the article.

Sunday, November 22, 2015

Spring Cloud support for Hystrix

Spring Cloud project provides comprehensive support for Netflix OSS Hystrix library. I have previously written about how to use the raw Hystrix library to wrap remote calls. Here I will be going over how Hystrix can be used with Spring Cloud

Basics

There is actually nothing much to it, the raw Hystrix concepts just carry over with certain Spring boot specific enhancements. Consider a simple Hystrix command, that wraps around a call to a Remote service:


import agg.samples.domain.Message;
import agg.samples.domain.MessageAcknowledgement;
import agg.samples.feign.RemoteServiceClient;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteMessageClientCommand extends HystrixCommand<MessageAcknowledgement> {
    private static final String COMMAND_GROUP = "demo";
    private static final Logger logger = LoggerFactory.getLogger(RemoteMessageClientCommand.class);

    private final RemoteServiceClient remoteServiceClient;
    private final Message message;

    public RemoteMessageClientCommand(RemoteServiceClient remoteServiceClient, Message message) {
        super(HystrixCommandGroupKey.Factory.asKey(COMMAND_GROUP));
        this.remoteServiceClient = remoteServiceClient;
        this.message = message;
    }

    @Override
    protected MessageAcknowledgement run() throws Exception {
        logger.info("About to make Remote Call");
        return this.remoteServiceClient.sendMessage(this.message);
    }

    @Override
    protected MessageAcknowledgement getFallback() {
        return new MessageAcknowledgement(message.getId(), message.getPayload(), "Fallback message");
    }
}

There are no Spring related classes here and this command can be used directly in a Spring based project, say in a controller the following way:


@RestController
public class RemoteCallDirectCommandController {

    @Autowired
    private RemoteServiceClient remoteServiceClient;

    @RequestMapping("/messageDirectCommand")
    public MessageAcknowledgement sendMessage(Message message) {
        RemoteMessageClientCommand remoteCallCommand = new RemoteMessageClientCommand(remoteServiceClient, message);
        return remoteCallCommand.execute();
    }
}

The customization of behavior of a Hystrix command is normally performed through NetflixOSS Archaius properties, however Spring Cloud provides a bridge to make the Spring defined properties visible as Archaius properties, this in short means that I can define my properties using Spring specific configuration files and they would be visible when customizing the command behavior.

So if were earlier customizing say a HelloWorldCommand's behavior using Archaius properties which look like this:

hystrix.command.HelloWorldCommand.metrics.rollingStats.timeInMilliseconds=10000
hystrix.command.HelloWorldCommand.execution.isolation.strategy=THREAD
hystrix.command.HelloWorldCommand.execution.isolation.thread.timeoutInMilliseconds=1000
hystrix.command.HelloWorldCommand.circuitBreaker.errorThresholdPercentage=50
hystrix.command.HelloWorldCommand.circuitBreaker.requestVolumeThreshold=20
hystrix.command.HelloWorldCommand.circuitBreaker.sleepWindowInMilliseconds=5000

this can be done in the Spring Cloud world the exact same way in a application.properties file or in a application.yml file the following way:

hystrix:
  command:
    HelloWorldCommand:
      metrics:
        rollingStats:
          timeInMilliseconds: 10000
      execution:
        isolation:
          strategy: THREAD
          thread:
            timeoutInMilliseconds: 5000
      circuitBreaker:
        errorThresholdPercentage: 50
        requestVolumeThreshold: 20
        sleepWindowInMilliseconds: 5000

Annotation based Approach

I personally prefer the direct command based approach, however a better approach for using Hystrix in the Spring world may be to use hystrix-javanica based annotations instead. The use of this annotation is best illustrated with an example. Here is the remote call wrapped in a Hystrix command with annotations:

import agg.samples.domain.Message;
import agg.samples.domain.MessageAcknowledgement;
import agg.samples.feign.RemoteServiceClient;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class RemoteMessageAnnotationClient  {

    private final RemoteServiceClient remoteServiceClient;

    @Autowired
    public RemoteMessageAnnotationClient(RemoteServiceClient remoteServiceClient) {
        this.remoteServiceClient = remoteServiceClient;
    }

    @HystrixCommand(fallbackMethod = "defaultMessage", commandKey = "RemoteMessageAnnotationClient" )
    public MessageAcknowledgement sendMessage(Message message) {
        return this.remoteServiceClient.sendMessage(message);
    }

    public MessageAcknowledgement defaultMessage(Message message) {
        return new MessageAcknowledgement("-1", message.getPayload(), "Fallback Payload");
    }

}


These annotations are translated using an aspect into a regular Hystrix commands behind the scenes, the neat thing though is that there is no ceremony in using this in a Spring Cloud project, it just works. Like before if the behavior needs to be customized it can be done with the command specific properties. One small catch is that the command name by default is the method name, so in my example the command name would have been "sendMessage", which I have customized using the annotation to be a different name.

If you are interested in exploring this sample further, here is my github project.

Saturday, November 7, 2015

Gentle Introduction to Hystrix - Wrapup

This is a follow up to two other posts - Motivation for why something like Hystrix is needed in a distributed systems and a basic intro to Hystrix.

This will be a wrap of my Hystrix journey with details of various properties that can be tweaked to change the behavior of Hystrix and will touch on a few advanced concepts

Tweaking Hystrix Behavior

Hystrix configuration is explained in this wiki here, in brief two broad groups control the properties of Hystrix,

1. Command Properties
2. ThreadPool properties

The properties follow an order of precedence that is explained in the wiki, here I will concentrate on ones specified through a properties file.

For a sample Command defined the following way:

public class HelloWorldCommand extends HystrixCommand<String> {

    private static final Logger logger = LoggerFactory.getLogger(HelloWorldCommand.class);

    private final String name;

    public HelloWorldCommand(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("default"));
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        logger.info("HelloWorld Command Invoked");
        return "Hello " + name;
    }
}

First behavior that can be tweaked is whether to execute the command in a thread pool or the same thread of execution as the caller(SEMAPHORE strategy type). If the execution is in a threadpool, then a timeout for the request can be set.

hystrix.command.HelloWorldCommand.execution.isolation.strategy=THREAD
hystrix.command.HelloWorldCommand.execution.isolation.thread.timeoutInMilliseconds=1000

The second behavior is the Circuit breaker which works based on information collected during a rolling window of time, configured this way, say for 10 seconds:

hystrix.command.HelloWorldCommand.metrics.rollingStats.timeInMilliseconds=10000

In this window if a certain percent of failures(say 50%) happen for a threshold of requests(say 20 in 10 seconds) then the circuit is broken, with a configuration which looks like this:

hystrix.command.HelloWorldCommand.circuitBreaker.requestVolumeThreshold=20
hystrix.command.HelloWorldCommand.circuitBreaker.errorThresholdPercentage=50

Once a circuit is broken, it stays that way for a time set the following way, 5 seconds in this instance:
hystrix.command.HelloWorldCommand.circuitBreaker.sleepWindowInMilliseconds=5000

The threadpool settings are controlled using the Group Key that was specified, called default in this sample. A specific "Threadpool Key" could also have been specified as part of the constructor though.

hystrix.threadpool.default.coreSize=10
hystrix.threadpool.default.queueSizeRejectionThreshold=5

Here 10 commands can potentially be run in parallel and another 5 held in a queue beyond which the requests will be rejected.

Request Collapsing

Tomaz Nurkiewicz in his blog site NoBlogDefFound has done an excellent job of explaining Request Collapsing . My example is a little simplistic, consider a case where a lot of requests are being made to retrieve a Person given an id, the following way:

public class PersonService {

    public Person findPerson(Integer id) {
        return new Person(id, "name : " + id);
    }

    public List<Person> findPeople(List<Integer> ids) {
        return ids
                .stream()
                .map(i -> new Person(i, "name : " + i))
                .collect(Collectors.toList());
    }
}

The service responds with a canned response but assume that the call was to a remote datastore. Also see that this service implements a batched method to retrieve a list of People given a list of id's.

Request Collapsing is a feature which would batch multiple user requests occurring over a time period into a single such remote call and then fan out the response back to the user.

A hystrix command which takes the set of id's and gets the response of people can be defined the following way:

public class PersonRequestCommand extends HystrixCommand<List<Person>>{

    private final List<Integer> ids;
    private final PersonService personService = new PersonService();
    private static final Logger logger = LoggerFactory.getLogger(PersonRequestCommand.class);

    public PersonRequestCommand(List<Integer> ids) {
        super(HystrixCommandGroupKey.Factory.asKey("default"));
        this.ids = ids;
    }

    @Override
    protected List<Person> run() throws Exception {
        logger.info("Retrieving details for : " + this.ids);
        return personService.findPeople(this.ids);
    }
}

Fairly straightforward up to this point, the complicated logic is now in the RequestCollapser which looks like this:

package aggregate.commands.collapsed;

import com.netflix.hystrix.HystrixCollapser;
import com.netflix.hystrix.HystrixCollapserKey;
import com.netflix.hystrix.HystrixCollapserProperties;
import com.netflix.hystrix.HystrixCommand;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public class PersonRequestCollapser extends HystrixCollapser<List<Person>, Person, Integer> {

    private final Integer id;
    public PersonRequestCollapser(Integer id) {
        super(Setter.
                withCollapserKey(HystrixCollapserKey.Factory.asKey("personRequestCollapser"))
                .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(2000)));
        this.id = id;
    }

    @Override
    public Integer getRequestArgument() {
        return this.id;
    }

    @Override
    protected HystrixCommand<List<Person>> createCommand(Collection<CollapsedRequest<Person, Integer>> collapsedRequests) {
        List<Integer> ids = collapsedRequests.stream().map(cr -> cr.getArgument()).collect(Collectors.toList());
        return new PersonRequestCommand(ids);
    }

    @Override
    protected void mapResponseToRequests(List<Person> batchResponse, Collection<CollapsedRequest<Person, Integer>> collapsedRequests) {
        Map<Integer, Person> personMap = batchResponse.stream().collect(Collectors.toMap(Person::getId, Function.identity()));

        for (CollapsedRequest<Person, Integer> cr: collapsedRequests) {
            cr.setResponse(personMap.get(cr.getArgument()));
        }
    }
}


There are a few things going on here, first the types in the parameterized type signature indicates the type of response(List<Person>), the response type expected by the caller (Person) and the request type of the request(id of the person). Then there are two methods one to create a batch command and the second to map the responses back to the original requests.

Now given this from a users perspective nothing much changes, the call is made as if to a single command and Request Collapsing handles batching, dispatching and mapping back the responses. This is how a sample test looks like:

@Test
public void testCollapse() throws Exception {
    HystrixRequestContext requestContext = HystrixRequestContext.initializeContext();

    logger.info("About to execute Collapsed command");
    List<Observable<Person>> result = new ArrayList<>();
    CountDownLatch cl = new CountDownLatch(1);
    for (int i = 1; i <= 100; i++) {
        result.add(new PersonRequestCollapser(i).observe());
    }

    Observable.merge(result).subscribe(p -> logger.info(p.toString())
            , t -> logger.error(t.getMessage(), t)
            , () -> cl.countDown());
    cl.await();
    logger.info("Completed executing Collapsed Command");
    requestContext.shutdown();
}

Conclusion

There is far more to Hystrix than what I have covered here. It is truly an awesome library, essential in creating a resilient system and I have come to appreciate the amount of thought process that has gone into designing this excellent library.


Reference


Here is my github repo with all the samples - https://github.com/bijukunjummen/hystrixdemo

Sunday, October 25, 2015

Gentle Introduction to Hystrix - Hello World

In a previous blog post I had covered the motivation for needing a library like Netflix Hystrix. Here I will jump into some of the very basic ways to start using Hystrix and follow it up with more complex use cases.


Hello World


A simple Hello World example of a "Hystrix Command" is the following:

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HelloWorldCommand extends HystrixCommand<String> {

    private static final Logger logger = LoggerFactory.getLogger(HelloWorldCommand.class);

    private final String name;

    public HelloWorldCommand(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("default"));
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        logger.info("HelloWorld Command Invoked");
        return "Hello " + name;
    }
}


The run method holds any dependent activity that we want to be protected against, which ultimately returns the parameterized type - String in this specific instance. If you are fan of Netflix Rx-java library , then another way to create the Hystrix command is the following:

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import rx.Observable;

public class HelloWorldObservableCommand extends HystrixObservableCommand<String> {

    private String name;

    public HelloWorldObservableCommand(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("default"));
        this.name = name;
    }

    @Override
    protected Observable<String> resumeWithFallback() {
        return Observable.just("Returning a Fallback");
    }

    @Override
    protected Observable<String> construct() {
        return Observable.just("Hello " + this.name);
    }
}

Here "construct" method returns the Rx-java Observable.


Using a Hystrix Command

Now that we have a Hystrix command to wrap around our call, it can be used in whole lot of different ways, let us start with the simplest, in a synchronous call -

HelloWorldCommand helloWorldCommand = new HelloWorldCommand("World");
assertEquals("Hello World", helloWorldCommand.execute());

Or, it can be made to return a Future :

HelloWorldCommand helloWorldCommand = new HelloWorldCommand("World");
Future future = helloWorldCommand.queue();
assertEquals("Hello World", future.get());

Or, even better it can be made to return a Rx-Java observable:

HelloWorldCommand helloWorldCommand = new HelloWorldCommand("World");

CountDownLatch l = new CountDownLatch(1);

Observable<String> obs = helloWorldCommand.observe();
obs.subscribe(
        s -> logger.info("Received : " + s),
        t -> logger.error(t.getMessage(), t),
        () -> l.countDown()
);
l.await(5, TimeUnit.SECONDS);


The Observable variation of the command also works along the same lines, however we should contrast a small behavior difference:

HelloWorldObservableCommand helloWorldCommand = new HelloWorldObservableCommand("World");
logger.info("Completed executing HelloWorld Command");
Observable<String> obs = helloWorldCommand.observe();

There are two ways to obtain an Observable here, one is like the above by making an ".observe()" call, another is the following way:

HelloWorldObservableCommand helloWorldCommand = new HelloWorldObservableCommand("World");
Observable<String> obs = helloWorldCommand.toObservable();

another is the following using ".toObservable()" call :

HelloWorldObservableCommand helloWorldCommand = new HelloWorldObservableCommand("World");
Observable<String> obs = helloWorldCommand.toObservable();

The difference is that the ".observe()" method returns a Hot Observable which starts executing the "construct" method immediately, whereas the ".toObservable" variation returns a Cold Observable and would not call "construct" method unless it is subscribed to, say the following way:

CountDownLatch l = new CountDownLatch(1);
obs.subscribe(System.out::println, t -> l.countDown(), () -> l.countDown());
l.await();

I have more information here.

Note though that Hystrix Command is not a Singleton, the typical way to use Hystrix Command is to construct it where it is required and dispose it once done.

Fallback and Command Group Key

In the constructor of the HelloWorldCommand, I had called a super class constructor method with the following signature:

public HelloWorldCommand(String name) {
    super(HystrixCommandGroupKey.Factory.asKey(&quot;default&quot;));
    this.name = name;
}

This parameter specifies a Hystrix "Command group" Key, along with Command Key which by default is the simple name of the class, it controls a lot of the bells and whistles of Hystrix behavior, a sample of the properties is the following and I will come back to the specifics of these later:

hystrix.command.HelloWorldCommand.metrics.rollingStats.timeInMilliseconds=10000
hystrix.command.HelloWorldCommand.execution.isolation.strategy=THREAD
hystrix.command.HelloWorldCommand.execution.isolation.thread.timeoutInMilliseconds=1000
hystrix.command.HelloWorldCommand.execution.isolation.semaphore.maxConcurrentRequests=10
hystrix.command.HelloWorldCommand.circuitBreaker.errorThresholdPercentage=50
hystrix.command.HelloWorldCommand.circuitBreaker.requestVolumeThreshold=20
hystrix.command.HelloWorldCommand.circuitBreaker.sleepWindowInMilliseconds=5000

hystrix.threadpool.default.coreSize=10
hystrix.threadpool.default.queueSizeRejectionThreshold=5

Another behavior we may want to control is the response in case the call to the dependent service fails, a fallback method provides this behavior, so consider a case where the dependent service always fails:

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FallbackCommand extends HystrixCommand<String> {

    private static final String COMMAND_GROUP="default";
    private static final Logger logger = LoggerFactory.getLogger(FallbackCommand.class);


    public FallbackCommand() {
        super(HystrixCommandGroupKey.Factory.asKey(COMMAND_GROUP));
    }

    @Override
    protected String run() throws Exception {
        throw new RuntimeException("Always fail");
    }

    @Override
    protected String getFallback() {
        logger.info("About to fallback");
        return "Falling back";
    }
}

Here the dependent service call always fails and the response as shown in the following test will always be the response from the fallback method:

FallbackCommand fallbackCommand = new FallbackCommand();
assertEquals("Falling back", fallbackCommand.execute());


Monitoring

Before I wrap up the basics it is good to demonstrate an awesome feature that Hystrix packs in terms of Hystrix stream and Hystrix dashboard. Let us start with Hystrix streams, if enabled typically as a servlet in Java based webapplications, it provides a SSE stream of realtime statistics about the behavior of the Hystrix commands present in the web application.

Since my demo is based on a Karyon2 Rx-Netty based application, my configuration can be seen here. The information from the Hystrix stream is a little too raw though, this is where the awesome Hystrix dashboard fits in - It consumes the Hystrix stream and shows real-time aggregated information about how each of the Hystrix command and different underlying threadpools are performing. I have here a sample Hystrix dashboard project based on the awesome Spring-Cloud project. A sample dashboard is here:



Conclusion

This covers the Hystrix basics, there is a lot more to go, I will wrap this up in the next blog post with details on some of the advanced Hystrix features.

Sunday, October 18, 2015

Scala extractors infix sample with Rational numbers

I keep coming back to the awesome introductory material on Scala put together by Daniel Westheide. One of the examples that he provides for extractors using an infix operation pattern is the Streams API -

val xs = 58 #:: 43 #:: 93 #:: Stream.empty
xs match {
  case first #:: second #:: _ => first - second
  case _ => -1
}

where the extractor is defined this way:

object #:: {
  def unapply[A](xs: Stream[A]): Option[(A, Stream[A])] =
    if (xs.isEmpty) None
    else Some((xs.head, xs.tail))
}

Given this I wanted to try an extractor on the Rational number samples from the Scala by Example book, this is how the Rational number looks:

class Rational(n: Int, d: Int) {
private def gcd(x: Int, y: Int): Int = {
if (x == 0) y
else if (x < 0) gcd(-x,y)
else if (y < 0) -gcd(x, -y)
else gcd(y % x, x)
}
private val g = gcd(n, d)

val numer: Int = n/g
val denom: Int = d/g

def +(that: Rational) =
new Rational(numer * that.denom + that.numer * denom,
denom * that.denom)

def -(that: Rational) =
new Rational(numer * that.denom - that.numer * denom,
denom * that.denom)

def *(that: Rational) =
new Rational(numer * that.numer, denom * that.denom)

def /(that: Rational) =
new Rational(numer * that.denom, denom * that.numer)


override def toString = "" + numer + "/" + denom + ""

def square = new Rational(numer*numer, denom*denom)
}

and I wanted an extractor which would behave the following way:

val r = new Rational(2, 3)  

r match {
  case num / denom => num + "/" + denom
}   

This is absolutely feasible in Scala given the flexibility in the names of identifiers. Given this an extractor called "/" can be defined the following way:

object / {
 def unapply(r: Rational): Option[(Int, Int)] = Some(r.numer, r.denom)
}

and used for extracting the numerator and denominator of the rational number!

r match {
 case /(num, denom) => num + "/" + denom
} 

//using infix
r match {
 case num / denom => num + "/" + denom
}                             

Sunday, October 11, 2015

Gentle Introduction to Hystrix - Motivation

In the last few days I have been exploring the Netflix Hystrix library and have come to appreciate the features provided by this excellent library.

To quote from the Hystrix site:

Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems, services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems where failure is inevitable.

There are a whole lot of keywords to parse here, however the best way to experience Hystrix in my mind is to try out a sample use case.


An unpredictable Service

Consider a service, an odd one, which takes a json message of the following structure and returns an acknowledgement:


{
    "id":"1",
    "payload": "Sample Payload",
    "throw_exception":false,
    "delay_by": 0
}

The service takes in a payload, but additionally takes in two fields - delay_by which makes the service acknowledge a response after the delay in milliseconds and a "throw_exceptions" field which will result in an exception after the specified delay!

Here is a sample response:

{
 "id":"1",
 "received":"Sample Payload",
 "payload":"Reply Message"
}


If you are following along, here is my github repo with this sample, I have used Netflix Karyon 2 for this sample and the code which handles the request can be expressed very concisely the following way - see how the rx-java library is being put to good use here:

import com.netflix.governator.annotations.Configuration;
import rx.Observable;
import service1.domain.Message;
import service1.domain.MessageAcknowledgement;

import java.util.concurrent.TimeUnit;

public class MessageHandlerServiceImpl implements MessageHandlerService {

    @Configuration("reply.message")
    private String replyMessage;

    public Observable<MessageAcknowledgement> handleMessage(Message message) {
        logger.info("About to Acknowledge");
        return Observable.timer(message.getDelayBy(), TimeUnit.MILLISECONDS)
                .map(l -> message.isThrowException())
                .map(throwException -> {
                    if (throwException) {
                        throw new RuntimeException("Throwing an exception!");
                    }
                    return new MessageAcknowledgement(message.getId(), message.getPayload(), replyMessage);
                });
    }


}


At this point we have a good candidate service which can be made to respond with an arbitrary delay and failure.


A client to the Service

Now onto a client to this service. I am using Netflix Feign to make this call, yet another awesome library, all it requires is a java interface annotated the following way:

package aggregate.service;

import aggregate.domain.Message;
import aggregate.domain.MessageAcknowledgement;
import feign.RequestLine;

public interface RemoteCallService {
    @RequestLine("POST /message")
    MessageAcknowledgement handleMessage(Message message);
}

It creates the necessary proxy implementing this interface using configuration along these lines:

RemoteCallService remoteCallService = Feign.builder()
        .encoder(new JacksonEncoder())
        .decoder(new JacksonDecoder())
        .target(RemoteCallService.class, "http://127.0.0.1:8889");


I have multiple endpoints which delegate calls to this remote client, all of them expose a url pattern along these lines - http://localhost:8888/noHystrix?message=Hello&delay_by=0&throw_exception=false, this first one is an example where the endpoint does not use Hystrix.



No Hystrix Case


As a first example, consider calls to the Remote service without Hystrix, if I were to try a call to http://localhost:8888/noHystrix?message=Hello&delay_by=5000&throw_exception=false or say to http://localhost:8888/noHystrix?message=Hello&delay_by=5000&throw_exception=true, in both instances the user request to the endpoints will simply hang for 5 seconds before responding.


There should be a few things immediately apparent here:

1. If the service responds slowly, then the client requests to the service will be forced to wait for the response to come back.

2. Under heavy load it is very likely that all threads handling user traffic will be exhausted, thus failing further user requests.

2. If the service were to throw an exception, the client does not handle it gracefully.

Clearly there is a need for something like Hystrix which handles all these issues.

Hystrix command wrapping Remote calls


I conducted a small load test using a 50 user load on the previous case and got a result along these lines:

================================================================================
---- Global Information --------------------------------------------------------
> request count                                         50 (OK=50     KO=0     )
> min response time                                   5007 (OK=5007   KO=-     )
> max response time                                  34088 (OK=34088  KO=-     )
> mean response time                                 17797 (OK=17797  KO=-     )
> std deviation                                       8760 (OK=8760   KO=-     )
> response time 50th percentile                      19532 (OK=19532  KO=-     )
> response time 75th percentile                      24386 (OK=24386  KO=-     )
> mean requests/sec                                  1.425 (OK=1.425  KO=-     )

Essentially a 5 second delay from the service results in a 75th percentile time of 25 seconds!, now consider the same test with Hystrix command wrapping the service calls:

================================================================================
---- Global Information --------------------------------------------------------
> request count                                         50 (OK=50     KO=0     )
> min response time                                      1 (OK=1      KO=-     )
> max response time                                   1014 (OK=1014   KO=-     )
> mean response time                                    22 (OK=22     KO=-     )
> std deviation                                        141 (OK=141    KO=-     )
> response time 50th percentile                          2 (OK=2      KO=-     )
> response time 75th percentile                          2 (OK=2      KO=-     )
> mean requests/sec                                 48.123 (OK=48.123 KO=-     )

Strangely the 75th percentile time now is 2 millseconds!, how is this possible, and the answer becomes obvious using the excellent tools that Hystrix provides, here is a Hystrix dashboard view for this test:



What happened here is that the first 10 requests timed out, anything more than a second by default times out with Hystrix command in place, once the first ten transactions failed Hystrix short circuited the command thus blocking anymore requests to the remote service and hence the low response time. On why these transactions were not showing up as failed, this is because there is a fallback in place here which responds to the user request gracefully on failure.

Conclusion

The purpose here was to set the motivation for why a library like Hystrix is required, I will follow this up with the specifics of what is needed to integrate Hystrix into an application and the breadth of features that this excellent library provides.

Saturday, September 26, 2015

Spring Cloud Sidecar - Initialization of Nodes

In the last blog post I had described how the Sidecar application can be used for registering the Cassandra nodes with Eureka and more generally can be used for registering any non-JVM application with Eureka.

In this post I will cover how an application can go about querying the Sidecar registered nodes.

Discovering Registered Nodes - Post Initialization

If the registered nodes are not required during the bean initialization phase then discovering the nodes is fairly straightforward along these lines:

@Component
public class SampleCommandLineRunner implements CommandLineRunner {

    @Autowired
    private DiscoveryClient discoveryClient;

    @PostConstruct
    public void postConstruct() {
//        System.out.println("Printing from postConstruct");
//        printDiscoveredNodes();
    }

    @Override
    public void run(String... strings) throws Exception {
        System.out.println("Printing from run method");
        printDiscoveredNodes();
    }

    public void printDiscoveredNodes() {
        System.out.println(" Printing Discovered Nodes ");

        for (ServiceInstance instance: discoveryClient.getInstances("samplecassandra.vip")) {
            System.out.println("Host: Port = " + instance.getHost() + ":" + instance.getPort());
        }
    }
}

These would print the nodes registered with a name of "samplecasssandra.vip" VIP.

Note that the nodes are being printed from the run method which gets called past the initialization of Spring container. If however the nodes were attempted to be listed from one of the lifecycle stages say the postConstruct method then very likely an exception will be thrown (this behavior is seen with "Angel.SR3" release of Spring Cloud, but appears to work cleanly with "Brixton.*" versions)

Discovering Registered Nodes - During Initialization

Now if an application needs to discover the nodes during initialization the flow is a little more complicated, for a potential issue look at this ticket.

The DiscoveryClient is initialized very late in the Spring Lifecycle and if DiscoveryClient is used in any post-processing activity of a bean it is likely to give an exception.

As an example, say the Cassandra nodes registered using Sidecar is now used by an application to initialize Cassandra connectivity, a way to do it would be to create a wrapper around Cassandra connectivity this way:

import org.springframework.data.cassandra.core.CassandraTemplate;


public class CassandraTemplateWrapper extends CassandraTemplate {

    @Override
    public void afterPropertiesSet() {
        
    }
}

Here CassandraTemplate is being overridden to prevent the check in afterPropertiesSet method that a Cassandra session exists, as a session will be established much later in the start-up cycle.

A Cassandra session can be injected into this custom CassandraTemplate lazily in a bean that implements SmartLifecyle along these lines:

package mvctest.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.Ordered;
import org.springframework.data.cassandra.core.CassandraTemplate;
import org.springframework.stereotype.Component;

@Component("cassandraTemplate")
public class EurekaCassandraTemplateFactoryBean implements SmartLifecycle, FactoryBean<CassandraTemplate>, Ordered {

    ....


    @Override
    public boolean isAutoStartup() {
        return true;
    }

    @Override
    public void start() {
        LOGGER.info("About to start Discovery client lookup of Cassandra Cluster!!!");
        final Cluster cluster = this.eurekaClusterBuilder.build();
        Session session = cluster.connect(this.cassandraProperties.getKeyspace());
        this.cassandraTemplateWrapper.setSession(session);
        LOGGER.info("Completed Discovery client lookup of Cassandra Cluster!!!");
        running = true;
    }

    @Override
    public boolean isRunning() {
        return this.running;
    }

    @Override
    public int getPhase() {
        return Integer.MAX_VALUE;
    }

    @Override
    public int getOrder() {
        return 1;
    }
}

This way the Cassandra session can be created very late in the cycle. Somewhat rough, but the approach works.

If you are interested in exploring this sample further I have this code available in my github repo here.

Wednesday, September 16, 2015

Spring Cloud Sidecar

I have an application deployed to a NetflixOSS based cloud which has a structure along these lines:

Essentially a service which persists information to a Cassandra cluster. All the applications are registered to Eureka - so in this instance the service as well as the Cassandra nodes are registered with Eureka, further the service connects to the Cassandra cluster by looking up the nodes via Eureka. 

I will deal with this in two parts - 

  1. Registering Cassandra nodes with Eureka
  2. Service using Eureka to connect to the Cassandra Cluster


Registering Cassandra Nodes with Eureka

This is where a Sidecar application fits in - the purpose of  Sidecar is to facilitate some of things that make an application a good citizen in a Cloud environment, in this specific instance it enables Cassandra to register with Eureka, respond to health checks. Spring Cloud Netflix Sidecar project provides the necessary support to create a Sidecar application.

The amount of coding required to get a Sidecar application up and running is very minimal, Sidecar behaves like a typically Spring Cloud application except that instead of registering itself to Eureka it has to register another application, so the configuration is mostly the same. 

This is my entire code for the Sidecar application!:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.sidecar.EnableSidecar;

@SpringBootApplication
@EnableSidecar
public class SampleSidecarApplication {

    public static void main(String[] args) {
        SpringApplication.run(SampleSidecarApplication.class, args);
    }
}



and the properties to go with this:

application.yml
eureka:
  instance:
    virtual-host-name: samplecassandra.vip

spring:
  application:
    name: samplecassandra

sidecar:
  port: 9042

Here the port is declared to be the port that is relevant to Cassandra.

There is one more aspect to handle, healthcheck, the Sidecar exposes an endpoint which can test the health of the supported application in whatever makes sense for the application. For Cassandra it could be connecting to the local node and firing a small CQL query.

Conclusion

Assuming that the Cassandra nodes are now registered with Eureka, there is a good level of complication in trying to create a Cassandra Session on the consuming service side, this is mainly because of the timing involved in instantiating the Eureka client and the point at which the code tries to find the list of nodes. I will cover this in a subsequent post. If you would like to explore this sample further, here is the github repo.

Wednesday, September 9, 2015

Rest client calls with Spring Cloud

There are a few interesting ways to make REST client calls with the Spring-Cloud project.
Spring-Cloud rest support builds on top of the core Netflix OSS libraries, but abstracts them and in the process simplifies using the libraries.

RestTemplate

As a first step let us consider the traditional way to make Rest calls through Spring based applications, using RestTemplate:

public class RestTemplateIntegrationTest {

    @Autowired
    private RestTemplate restTemplate;

    @Test
    public void testCallPongService() {
        ResponseEntity<MessageAcknowledgement> ack =
                restTemplate.exchange("http://servicehost/message",
                        HttpMethod.POST,
                        new HttpEntity<>(new Message("test", "hello")),
                        MessageAcknowledgement.class,
                        Collections.emptyMap());
        assertThat(ack.getBody().getPayload(), equalTo("Pong From Configuration Server"));
    }
}

In this specific instance, the host part of the url is expected to be completely known to the client, RestTemplate will take care of marshalling the Java object to the appropriate media type, making the REST call, and unmarshalling the response back to a Java Object.

RestTemplate with Ribbon and Eureka

Netflix Ribbon provides a library for making REST based calls, whereas with RestTemplate the host is expected to be completely known to the client, with Ribbon the host is typically resolved through the Centralized Netflix Eureka server and Ribbon takes care of load-balancing the calls if multiple hosts are found for a service. If Spring-cloud libraries and Ribbon related libraries are present in the classpath, then Spring-Cloud enhances RestTemplate to be based on Ribbon instead with no additional configuration required, with Spring-Cloud in place the call would exactly like before, with a few twists.

ResponseEntity&lt;MessageAcknowledgement&gt; ack =
        restTemplate.exchange("http://sample-pong/message",
                HttpMethod.POST,
                new HttpEntity&lt;&gt;(new Message("test", "hello")),
                MessageAcknowledgement.class,
                Collections.emptyMap());

The twist is that the hostname which in this instance is "sample-pong" is significant, it is not the real host name - instead, an attempt is made to find the list of servers with this name as the registration name in Eureka and the resulting host/port is used for making the request.


If customizations are required a named client can be provided with Ribbon specific properties specified for the named client, along these lines:

ResponseEntity&lt;MessageAcknowledgement&gt; ack =
        restTemplate.exchange("http://samplepong/message",
                HttpMethod.POST,
                new HttpEntity&lt;&gt;(new Message("test", "hello")),
                MessageAcknowledgement.class,
                Collections.emptyMap());

The named client above is "samplepong" and the ribbon specific properties for this client is along these lines:

samplepong:
  ribbon:
    DeploymentContextBasedVipAddresses: sample-pong
    NIWSServerListClassName: com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList
    ReadTimeout: 5000
    MaxAutoRetries: 2

If you are interested in more low level configurations for Ribbon, refer here

Ribbon is a fairly complicated low-level way of making a REST call, RestTemplate abstracts the Ribbon implementation and makes it look easy from a clients perspective.

Netflix Feign

Netflix Feign is another simplified way to make calls to REST based services, all it requires is an interface with relevant annotations which is best demonstrated with an example:

import org.bk.consumer.domain.Message;
import org.bk.consumer.domain.MessageAcknowledgement;
import org.springframework.cloud.netflix.feign.FeignClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;

@FeignClient("samplepong")
public interface PongClient {

    @RequestMapping(method = RequestMethod.POST, value = "/message",
            produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
    MessageAcknowledgement sendMessage(@RequestBody Message message);
}

The annotations are Spring specific though, Spring-Cloud facilitates this by adding in encoders and decoders which support Spring MVC annotations.

@FeignClient annotation on the interface identifies this a FeignClient code. @EnableFeignClients is required in a Spring Configuration to load up all such FeignClient's.

@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
public class PingApplication {

    public static void main(String[] args) {
        SpringApplication.run(PingApplication.class, args);
    }
}


References


Friday, August 28, 2015

Couchbase Java SDK with Rx-Java

A neat thing about Couchbase Java SDK is that it is built on top of the excellent Rx-Java library, this enables a reactive way to interact with a Couchbase server instance which is very intuitive once you get the hang of it.

Consider a very simple json document that I intend to store in Couchbase:

{"key":"1","value":"one"}


and a Java class to hold this json:

public class KeyVal {
    private String key;
    private String value;

    ...
}

The following is the code to insert an instance of KeyVal to a Couchbase bucket:

JsonObject jsonObject = JsonObject.empty().put("key", keyVal.getKey()).put("value", keyVal.getValue());
JsonDocument doc = JsonDocument.create(keyVal.getKey(), jsonObject);
Observable<JsonDocument> obs = bucket
                .async()
                .insert(doc);

The return type of the insert is an Observable, so if I needed to map the return type back a KeyVal I can use the extensive mapping support provided by Observable class.

Observable<KeyVal> obs = bucket
                .async()
                .insert(doc)
                .map(jsonDoc -> 
                    new KeyVal(jsonDoc.id(), jsonDoc.content().getString("value"))
                );



Other API's follow a similar pattern, for eg. to retrieve the saved document:

bucket
                .async()
                .get(id)
                .map(doc ->
                        new KeyVal(doc.id(),
                                doc.content().getString("value")));

If you are interested in exploring this sample further, here is my github repo with a working example - https://github.com/bijukunjummen/sample-karyon2-couch


References:

Tuesday, August 18, 2015

Spring boot static web resource handling for Single Page Applications

Javascript build tools like gulp and grunt truly boggle my mind, I look at one of the build scripts for these tools and find it difficult to get my head around it and cannot imagine writing one of these build scripts from scratch. This is where yeoman comes in, a very handy tool that quickly allows one to bootstrap a good starter project using any of the myriad combination of javascript build tools.

I wanted to explore an approach which Spring framework recommends for handling static web resources, which is to use these very capable build tools for building the static assets and using Spring for serving out the content once the static assets are built into a distributable state.

My approach was to use yeoman to generate a starter project, I chose the gulp-angular as my base and quickly generated a complete project, available here. I was able to expand this template into a fairly comprehensive angularjs based single page application which delegates back to Spring based REST calls to service the UI.

The steps that I followed are the following, mostly copied from the excellent sample created by Brian Clozel:

If you want to follow along the end result is available in my github repo.


  1. Define two modules, the "client" module to hold the generated yeoman template and a "server" module to hold the Spring Boot application.
  2. Hack away on the "client" module, in this specific instance I have created a simple angularjs based application
  3. I am using maven as the java build tool so I have a wrapper maven pom file which triggers the javascript build chain as part of the maven build cycle, then takes the built artifacts and creates a client jar out of it. The static content is cleverly placed at a location that Spring boot can get to, in this instance at the classpath:/static location.
  4. In the "server" module client is added as a dependency and "server" is set to be run as a full-fledged spring-boot project
  5. Serve out the project from the server module by executing:
    mvn spring-boot:run
    

Conclusion


Spring Boot has taken an excellent approach to providing an asset pipeline for static web resources which is to not interfere with the very capable build tools in the Javascript universe and providing a clean way to serve out the generated static content.

Saturday, August 8, 2015

Working with Spring-Cloud and Netflix Archaius

Background

Spring Cloud provides all the tools that you require to create cloud ready microservices. One of the infrastructure components that Spring-Cloud provides is a Configuration server to centralize the properties of an application, however it is possible that you that you may be using other solutions to manage the properties. One such solution is Netflix Archaius and if you work with Netflix Archaius there is a neat way that Spring-Cloud provides to integrate with it.

Integration With Archaius

Spring Cloud provides a Spring Boot Auto-configuration for Archaius which gets triggered on finding the Archaius related libraries with the application. So first to pull in the Archaius libraries, which can be done through the following dependency entry in the POM file:
<dependency>
    <groupId>com.netflix.archaius</groupId>
    <artifactId>archaius-core</artifactId>
</dependency>
Not that the version of the dependency need not be specified, this information flows in from the dependency management information in the parent POM’s.
With this new library in place, Archaius Configuration, all that now needs to be done is to define Spring beans which extend Apache Commons Configuration AbstractConfiguration class and these would automatically get configured by Spring Cloud. As an example consider the following AbstractConfiguration which has one property in it:
@Bean
public AbstractConfiguration sampleArchaiusConfiguration() throws Exception {
    ConcurrentMapConfiguration concurrentMapConfiguration = new ConcurrentMapConfiguration();
    concurrentMapConfiguration.addProperty("testkey", "testvalue");
    return concurrentMapConfiguration;
}

That is essentially it, this property should now be visible as an Archaius property and can be accessed along these lines:

DynamicPropertyFactory.getInstance().getStringProperty("testkey", "").get()


Also there are a few more neat features provided through Archaius integration in Spring-Cloud:
  1. The Spring managed properties are visible as Archaius properties
  2. An endpoint(/archaius) is provided by Spring-Cloud where all the registered archaius properties can be viewed

Conclusion

Spring Cloud natively provides all the tools to write a Cloud Ready microservice, however it is possible that the way to configure the centralized properties may be via Netflix Archaius, if that is the case Spring Cloud enables this neat way to integrate with Archiaus.

Tuesday, July 28, 2015

Spring Boot @ConfigurationProperties

Spring Boot provides a very neat way to load properties for an application. Consider a set of properties described using yaml format:

prefix:
    stringProp1: propValue1
    stringProp2: propValue2
    intProp1: 10
    listProp:
        - listValue1
        - listValue2
    mapProp:
        key1: mapValue1
        key2: mapValue2

These entries can also be described in a traditional application.properties file the following way:

prefix.stringProp1=propValue1
prefix.stringProp2=propValue2
prefix.intProp1=10
prefix.listProp[0]=listValue1
prefix.listProp[1]=listValue2
prefix.mapProp.key1=mapValue1
prefix.mapProp.key2=mapValue2

It has taken me a little while, but I do like the hierarchical look of the properties described in a yaml format.

So now, given this property file a traditional Spring application would have loaded up the properties the following way:

public class SamplePropertyLoadingTest {
    @Value("${prefix.stringProp1}")
    private String stringProp1;

Note the placeholder for "prefix.stringProp" key.

This however is not ideal for loading a family of related properties, say in this specific case namespaced by the prefix conveniently named "prefix".

The approach Spring boot takes is to define a bean that can hold all the family of related properties this way:

@ConfigurationProperties(prefix = "prefix")
@Component
public class SampleProperty {
    private String stringProp1;
    private String stringProp2;
    @Max(99)
    @Min(0)
    private Integer intProp1;
    private List<String> listProp;
    private Map<String, String> mapProp;
    
    ...
}

At runtime, all the fields would be bound to the related properties cleanly.

Additionally note the JSR-303 annotations on top of the "intProp1" field that validates that value of the field is between 0 and 99, @ConfigurationProperties will call the validator to ensure that bound bean is validated.

An integration test making use of this is the following:

package prop;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = SampleWebApplication.class)
public class SamplePropertyLoadingTest {
    @Autowired
    private SampleProperty sampleProperty;

    @Value("${prefix.stringProp1}")
    private String stringProp1;

    @Test
    public void testLoadingOfProperties() {
        System.out.println("stringProp1 = " + stringProp1);
        assertThat(sampleProperty.getStringProp1(), equalTo("propValue1"));
        assertThat(sampleProperty.getStringProp2(), equalTo("propValue2"));
        assertThat(sampleProperty.getIntProp1(), equalTo(10));
        assertThat(sampleProperty.getListProp(), hasItems("listValue1", "listValue2"));
        assertThat(sampleProperty.getMapProp(), allOf(hasEntry("key1", "mapValue1"),
                hasEntry("key2", "mapValue2")));
    }
}

If you are interested in exploring this sample further, I have a github repo with the code checked in here.

Thursday, July 23, 2015

Scatter Gather - Using Java 8 CompletableFuture and Rx-Java Observable

I wanted to explore a simple scatter-gather scenario using Java 8 CompletableFuture and using Rx-Java Observable.


The scenario is simple - Spawn about 10 tasks, each returning a string, and ultimately collect the results into a list.

Sequential

A sequential version of this would be the following:

public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

 logger.info(list.toString());
}

private String generateTask(int i) {
 Util.delay(2000);
 return i + "-" + "test";
}

With CompletableFuture

A method can be made to return a CompletableFuture using a utility method called supplyAsync, I am using a variation of this method which accepts an explicit Executor to use, also I am deliberately throwing an exception for one of the inputs:

private CompletableFuture<String> generateTask(int i,
  ExecutorService executorService) {
 return CompletableFuture.supplyAsync(() -> {
  Util.delay(2000);
  if (i == 5) {
   throw new RuntimeException("Run, it is a 5!");
  }
  return i + "-" + "test";
 }, executorService);
}

Now to scatter the tasks:

List<CompletableFuture<String>> futures =
  IntStream.range(0, 10)
    .boxed()
    .map(i -> this.generateTask(i, executors).exceptionally(t -> t.getMessage()))
    .collect(Collectors.toList());

At the end of scattering the tasks the result is a list of CompletableFuture. Now, to obtain the list of String from this is a little tricky, here I am using one of the solutions suggested in Stackoverflow:

CompletableFuture<List<String>> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
  .thenApply(v -> futures.stream()
       .map(CompletableFuture::join)
       .collect(Collectors.toList()));

CompletableFuture.allOf method is being used here purely to compose the next action to take once all the scattered tasks are completed, once the tasks are completed the futures are again streamed and collected into a list of string.

The final result can then be presented asynchronously:
result.thenAccept(l -> {
 logger.info(l.toString());
});


With Rx-java Observable

Scatter gather with Rx-java is relatively cleaner than the CompletableFuture version as Rx-java provides better ways to compose the results together, again the method which performs the scattered task:

private Observable<String> generateTask(int i, ExecutorService executorService) {
    return Observable
            .<String>create(s -> {
                Util.delay(2000);
                if ( i == 5) {
                    throw new RuntimeException("Run, it is a 5!");
                }
                s.onNext( i + "-test");
                s.onCompleted();
            }).onErrorReturn(e -> e.getMessage()).subscribeOn(Schedulers.from(executorService));
}

and to scatter the tasks:

List<Observable<String>> obs =
        IntStream.range(0, 10)
            .boxed()
            .map(i -> generateTask(i, executors)).collect(Collectors.toList());

Once more I have a List of Observable's, and what I need is a List of results, Observable provides a merge method to do just that:

Observable<List<String>> merged = Observable.merge(obs).toList();

which can be subscribed to and the results printed when available:

merged.subscribe(
                l -> logger.info(l.toString()));

Friday, July 10, 2015

Rx-netty and Karyon2 based cloud ready microservice - Dependency Injection

I had previously written about using Rx-netty and Karyon2 for developing cloud ready microservices, there were a few issues with the sample though, partly reproduced here:

package org.bk.samplepong.app;

.....

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

    private final String healthCheckUri;
    private final HealthCheckEndpoint healthCheckEndpoint;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
        this.healthCheckUri = healthCheckUri;
        this.healthCheckEndpoint = healthCheckEndpoint;
    }

    @Override
    public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
        if (request.getUri().startsWith(healthCheckUri)) {
            return healthCheckEndpoint.handle(request, response);
        } else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
            return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
                    .map(s -> {
                        try {
                            Message m = objectMapper.readValue(s, Message.class);
                            return m;
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    })
                    .map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                    .flatMap(ack -> {
                                try {
                                    return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                                } catch (Exception e) {
                                    response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                    return response.close();
                                }
                            }
                    );
        } else {
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return response.close();
        }
    }
}

The issues are:

  1. The routing logic is not centralized, the request handler has both the routing logic and the processing logic
  2. The dependencies are not injected in cleanly.


Looking at the Karyon2 samples, both of these issues are actually very cleanly addressed now which I wanted to document here.

Routing

Routing can be centralized using a custom Rx-netty RequestHandler called the SimpleUriRouter
The routes can be registered the following way using SimpleRouter which is being created here using a Guice Provider:

import com.google.inject.Inject;
import com.google.inject.Provider;
import io.netty.buffer.ByteBuf;
import netflix.karyon.health.HealthCheckHandler;
import netflix.karyon.transport.http.SimpleUriRouter;
import netflix.karyon.transport.http.health.HealthCheckEndpoint;
import org.bk.samplepong.app.ApplicationMessageHandler;
import org.bk.samplepong.common.health.HealthCheck;

public class AppRouteProvider implements Provider<SimpleUriRouter<ByteBuf, ByteBuf>> {

    @Inject
    private HealthCheck healthCheck;

    @Inject
    private ApplicationMessageHandler applicationMessageHandler;

    @Override
    public SimpleUriRouter get() {
        SimpleUriRouter simpleUriRouter = new SimpleUriRouter();
        simpleUriRouter.addUri("/healthcheck", new HealthCheckEndpoint(healthCheck));
        simpleUriRouter.addUri("/message", applicationMessageHandler);
        return simpleUriRouter;
    }
}

This router can now be registered via a custom guice module the following way:

public class KaryonAppModule extends KaryonHttpModule<ByteBuf, ByteBuf> {

    public KaryonAppModule() {
        super("routerModule", ByteBuf.class, ByteBuf.class);
    }

    @Override
    protected void configureServer() {
        bindRouter().toProvider(new AppRouteProvider());

        interceptorSupport().forUri("/*").intercept(LoggingInterceptor.class);

        server().port(8888);
    }
}

This is essentially it, now the routing logic is cleanly separated from the processing logic.

Dependency Injection


Dependency injection is handled via custom guice modules. I have a service, call it the MessageHandlerService, which takes in a message and returns an Acknowledgement, this service is defined as follows:

public class MessageHandlerServiceImpl implements MessageHandlerService {
    private static final Logger logger = LoggerFactory.getLogger(MessageHandlerServiceImpl.class);

    public Observable<MessageAcknowledgement> handleMessage(Message message) {
        return Observable.<MessageAcknowledgement>create(s -> {
            s.onNext(new MessageAcknowledgement(message.getId(), message.getPayload(), "Pong"));
            s.onCompleted();
        });
    }


}

Now, I have a guice module which specifies the binding between MessageHandlerService interface and the concrete MessageHandlerServiceImpl:

public class AppModule extends AbstractModule {


    @Override
    protected void configure() {
        bind(MessageHandlerService.class).to(MessageHandlerServiceImpl.class).in(Scopes.SINGLETON);
    }
}


With this in place, the MessageHandlerService can be injected in:

public class ApplicationMessageHandler implements RequestHandler<ByteBuf, ByteBuf> {

    private final ObjectMapper objectMapper = new ObjectMapper();

    private final MessageHandlerService messageHandlerService;

    @Inject
    public ApplicationMessageHandler(MessageHandlerService messageHandlerService) {
        this.messageHandlerService = messageHandlerService;
    }

    @Override
    public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
        return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
                .map(s -> {
                    try {
                        Message m = objectMapper.readValue(s, Message.class);
                        return m;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .flatMap(messageHandlerService::handleMessage)
                .flatMap(ack -> {
                            try {
                                return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                            } catch (Exception e) {
                                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                return response.close();
                            }
                        }
                );
    }
}


With both these features implemented, the app using Karyon2 is also greatly simplified and I have the complete working app in my github repository here: https://github.com/bijukunjummen/sample-ping-pong-netflixoss/tree/master/sample-pong

Monday, June 29, 2015

Learning Spring-Cloud - Writing a microservice

Continuing my Spring-Cloud learning journey, earlier I had covered how to write the infrastructure components of a typical Spring-Cloud and Netflix OSS based micro-services environment - in this specific instance two critical components, Eureka to register and discover services and Spring Cloud Configuration to maintain a centralized repository of configuration for a service. Here I will be showing how I developed two dummy micro-services, one a simple "pong" service and a "ping" service which uses the "pong" service.


Sample-Pong microservice


The endpoint handling the "ping" requests is a typical Spring MVC based endpoint:

@RestController
public class PongController {

    @Value("${reply.message}")
    private String message;

    @RequestMapping(value = "/message", method = RequestMethod.POST)
    public Resource<MessageAcknowledgement> pongMessage(@RequestBody Message input) {
        return new Resource<>(
                new MessageAcknowledgement(input.getId(), input.getPayload(), message));
    }

}

It gets a message and responds with an acknowledgement. Here the service utilizes the Configuration server in sourcing the "reply.message" property. So how does the "pong" service find the configuration server, there are potentially two ways - directly by specifying the location of the configuration server, or by finding the Configuration server via Eureka. I am used to an approach where Eureka is considered a source of truth, so in this spirit I am using Eureka to find the Configuration server. Spring Cloud makes this entire flow very simple, all it requires is a "bootstrap.yml" property file with entries along these lines:

---
spring:
  application:
    name: sample-pong
  cloud:
    config:
      discovery:
        enabled: true
        serviceId: SAMPLE-CONFIG

eureka:
  instance:
    nonSecurePort: ${server.port:8082}
  client:
    serviceUrl:
      defaultZone: http://${eureka.host:localhost}:${eureka.port:8761}/eureka/

The location of Eureka is specified through the "eureka.client.serviceUrl" property and the "spring.cloud.config.discovery.enabled" is set to "true" to specify that the configuration server is discovered via the specified Eureka server.

Just a note, this means that the Eureka and the Configuration server have to be completely up before trying to bring up the actual services, they are the pre-requisites and the underlying assumption is that the Infrastructure components are available at the application boot time.

The Configuration server has the properties for the "sample-pong" service, this can be validated by using the Config-servers endpoint - http://localhost:8888/sample-pong/default, 8888 is the port where I had specified for the server endpoint, and should respond with a content along these lines:

"name": "sample-pong",
  "profiles": [
    "default"
  ],
  "label": "master",
  "propertySources": [
    {
      "name": "classpath:/config/sample-pong.yml",
      "source": {
        "reply.message": "Pong"
      }
    }
  ]
}

As can be seen the "reply.message" property from this central configuration server will be used by the pong service as the acknowledgement message

Now to set up this endpoint as a service, all that is required is a Spring-boot based entry point along these lines:

@SpringBootApplication
@EnableDiscoveryClient
public class PongApplication {
    public static void main(String[] args) {
        SpringApplication.run(PongApplication.class, args);
    }
}

and that completes the code for the "pong" service.


Sample-ping micro-service


So now onto a consumer of the "pong" micro-service, very imaginatively named the "ping" micro-service. Spring-Cloud and Netflix OSS offer a lot of options to invoke endpoints on Eureka registered services, to summarize the options that I had:

1. Use raw Eureka DiscoveryClient to find the instances hosting a service and make calls using Spring's RestTemplate.

2. Use Ribbon, a client side load balancing solution which can use Eureka to find service instances

3. Use Feign, which provides a declarative way to invoke a service call. It internally uses Ribbon.

I went with Feign. All that is required is an interface which shows the contract to invoke the service:

package org.bk.consumer.feign;

import org.bk.consumer.domain.Message;
import org.bk.consumer.domain.MessageAcknowledgement;
import org.springframework.cloud.netflix.feign.FeignClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

@FeignClient("samplepong")
public interface PongClient {

    @RequestMapping(method = RequestMethod.POST, value = "/message",
            produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
    @ResponseBody
    MessageAcknowledgement sendMessage(@RequestBody Message message);
}

The annotation @FeignClient("samplepong") internally points to a Ribbon "named" client called "samplepong". This means that there has to be an entry in the property files for this named client, in my case I have these entries in my application.yml file:

samplepong:
  ribbon:
    DeploymentContextBasedVipAddresses: sample-pong
    NIWSServerListClassName: com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList
    ReadTimeout: 5000
    MaxAutoRetries: 2

The most important entry here is the "samplepong.ribbon.DeploymentContextBasedVipAddresses" which points to the "pong" services Eureka registration address using which the service instance will be discovered by Ribbon.

The rest of the application is a routine Spring Boot application. I have exposed this service call behind Hystrix which guards against service call failures and essentially wraps around this FeignClient:

package org.bk.consumer.service;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import org.bk.consumer.domain.Message;
import org.bk.consumer.domain.MessageAcknowledgement;
import org.bk.consumer.feign.PongClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

@Service("hystrixPongClient")
public class HystrixWrappedPongClient implements PongClient {

    @Autowired
    @Qualifier("pongClient")
    private PongClient feignPongClient;

    @Override
    @HystrixCommand(fallbackMethod = "fallBackCall")
    public MessageAcknowledgement sendMessage(Message message) {
        return this.feignPongClient.sendMessage(message);
    }

    public MessageAcknowledgement fallBackCall(Message message) {
        MessageAcknowledgement fallback = new MessageAcknowledgement(message.getId(), message.getPayload(), "FAILED SERVICE CALL! - FALLING BACK");
        return fallback;
    }
}


"Boot"ing up


I have dockerized my entire set-up, so the simplest way to start up the set of applications is to first build the docker images for all of the artifacts this way:

mvn clean package docker:build -DskipTests

and bring all of them up using the following command, the assumption being that both docker and docker-compose are available locally:

docker-compose up

Assuming everything comes up cleanly, Eureka should show all the registered services, at http://dockerhost:8761 url -


The UI of the ping application should be available at http://dockerhost:8080 url -



Additionally a Hystrix dashboard should be available to monitor the requests to the "pong" app at this url http://dockerhost:8989/hystrix/monitor?stream=http%3A%2F%2Fsampleping%3A8080%2Fhystrix.stream:



References


1. The code is available at my github location - https://github.com/bijukunjummen/spring-cloud-ping-pong-sample

2. Most of the code is heavily borrowed from the spring-cloud-samples repository - https://github.com/spring-cloud-samples