Sunday, October 25, 2015

Gentle Introduction to Hystrix - Hello World

In a previous blog post I had covered the motivation for needing a library like Netflix Hystrix. Here I will jump into some of the very basic ways to start using Hystrix and follow it up with more complex use cases.


Hello World


A simple Hello World example of a "Hystrix Command" is the following:

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HelloWorldCommand extends HystrixCommand<String> {

    private static final Logger logger = LoggerFactory.getLogger(HelloWorldCommand.class);

    private final String name;

    public HelloWorldCommand(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("default"));
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        logger.info("HelloWorld Command Invoked");
        return "Hello " + name;
    }
}


The run method holds any dependent activity that we want to be protected against, which ultimately returns the parameterized type - String in this specific instance. If you are fan of Netflix Rx-java library , then another way to create the Hystrix command is the following:

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import rx.Observable;

public class HelloWorldObservableCommand extends HystrixObservableCommand<String> {

    private String name;

    public HelloWorldObservableCommand(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("default"));
        this.name = name;
    }

    @Override
    protected Observable<String> resumeWithFallback() {
        return Observable.just("Returning a Fallback");
    }

    @Override
    protected Observable<String> construct() {
        return Observable.just("Hello " + this.name);
    }
}

Here "construct" method returns the Rx-java Observable.


Using a Hystrix Command

Now that we have a Hystrix command to wrap around our call, it can be used in whole lot of different ways, let us start with the simplest, in a synchronous call -

HelloWorldCommand helloWorldCommand = new HelloWorldCommand("World");
assertEquals("Hello World", helloWorldCommand.execute());

Or, it can be made to return a Future :

HelloWorldCommand helloWorldCommand = new HelloWorldCommand("World");
Future future = helloWorldCommand.queue();
assertEquals("Hello World", future.get());

Or, even better it can be made to return a Rx-Java observable:

HelloWorldCommand helloWorldCommand = new HelloWorldCommand("World");

CountDownLatch l = new CountDownLatch(1);

Observable<String> obs = helloWorldCommand.observe();
obs.subscribe(
        s -> logger.info("Received : " + s),
        t -> logger.error(t.getMessage(), t),
        () -> l.countDown()
);
l.await(5, TimeUnit.SECONDS);


The Observable variation of the command also works along the same lines, however we should contrast a small behavior difference:

HelloWorldObservableCommand helloWorldCommand = new HelloWorldObservableCommand("World");
logger.info("Completed executing HelloWorld Command");
Observable<String> obs = helloWorldCommand.observe();

There are two ways to obtain an Observable here, one is like the above by making an ".observe()" call, another is the following way:

HelloWorldObservableCommand helloWorldCommand = new HelloWorldObservableCommand("World");
Observable<String> obs = helloWorldCommand.toObservable();

another is the following using ".toObservable()" call :

HelloWorldObservableCommand helloWorldCommand = new HelloWorldObservableCommand("World");
Observable<String> obs = helloWorldCommand.toObservable();

The difference is that the ".observe()" method returns a Hot Observable which starts executing the "construct" method immediately, whereas the ".toObservable" variation returns a Cold Observable and would not call "construct" method unless it is subscribed to, say the following way:

CountDownLatch l = new CountDownLatch(1);
obs.subscribe(System.out::println, t -> l.countDown(), () -> l.countDown());
l.await();

I have more information here.

Note though that Hystrix Command is not a Singleton, the typical way to use Hystrix Command is to construct it where it is required and dispose it once done.

Fallback and Command Group Key

In the constructor of the HelloWorldCommand, I had called a super class constructor method with the following signature:

public HelloWorldCommand(String name) {
    super(HystrixCommandGroupKey.Factory.asKey(&quot;default&quot;));
    this.name = name;
}

This parameter specifies a Hystrix "Command group" Key, along with Command Key which by default is the simple name of the class, it controls a lot of the bells and whistles of Hystrix behavior, a sample of the properties is the following and I will come back to the specifics of these later:

hystrix.command.HelloWorldCommand.metrics.rollingStats.timeInMilliseconds=10000
hystrix.command.HelloWorldCommand.execution.isolation.strategy=THREAD
hystrix.command.HelloWorldCommand.execution.isolation.thread.timeoutInMilliseconds=1000
hystrix.command.HelloWorldCommand.execution.isolation.semaphore.maxConcurrentRequests=10
hystrix.command.HelloWorldCommand.circuitBreaker.errorThresholdPercentage=50
hystrix.command.HelloWorldCommand.circuitBreaker.requestVolumeThreshold=20
hystrix.command.HelloWorldCommand.circuitBreaker.sleepWindowInMilliseconds=5000

hystrix.threadpool.default.coreSize=10
hystrix.threadpool.default.queueSizeRejectionThreshold=5

Another behavior we may want to control is the response in case the call to the dependent service fails, a fallback method provides this behavior, so consider a case where the dependent service always fails:

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FallbackCommand extends HystrixCommand<String> {

    private static final String COMMAND_GROUP="default";
    private static final Logger logger = LoggerFactory.getLogger(FallbackCommand.class);


    public FallbackCommand() {
        super(HystrixCommandGroupKey.Factory.asKey(COMMAND_GROUP));
    }

    @Override
    protected String run() throws Exception {
        throw new RuntimeException("Always fail");
    }

    @Override
    protected String getFallback() {
        logger.info("About to fallback");
        return "Falling back";
    }
}

Here the dependent service call always fails and the response as shown in the following test will always be the response from the fallback method:

FallbackCommand fallbackCommand = new FallbackCommand();
assertEquals("Falling back", fallbackCommand.execute());


Monitoring

Before I wrap up the basics it is good to demonstrate an awesome feature that Hystrix packs in terms of Hystrix stream and Hystrix dashboard. Let us start with Hystrix streams, if enabled typically as a servlet in Java based webapplications, it provides a SSE stream of realtime statistics about the behavior of the Hystrix commands present in the web application.

Since my demo is based on a Karyon2 Rx-Netty based application, my configuration can be seen here. The information from the Hystrix stream is a little too raw though, this is where the awesome Hystrix dashboard fits in - It consumes the Hystrix stream and shows real-time aggregated information about how each of the Hystrix command and different underlying threadpools are performing. I have here a sample Hystrix dashboard project based on the awesome Spring-Cloud project. A sample dashboard is here:



Conclusion

This covers the Hystrix basics, there is a lot more to go, I will wrap this up in the next blog post with details on some of the advanced Hystrix features.

Sunday, October 18, 2015

Scala extractors infix sample with Rational numbers

I keep coming back to the awesome introductory material on Scala put together by Daniel Westheide. One of the examples that he provides for extractors using an infix operation pattern is the Streams API -

val xs = 58 #:: 43 #:: 93 #:: Stream.empty
xs match {
  case first #:: second #:: _ => first - second
  case _ => -1
}

where the extractor is defined this way:

object #:: {
  def unapply[A](xs: Stream[A]): Option[(A, Stream[A])] =
    if (xs.isEmpty) None
    else Some((xs.head, xs.tail))
}

Given this I wanted to try an extractor on the Rational number samples from the Scala by Example book, this is how the Rational number looks:

class Rational(n: Int, d: Int) {
private def gcd(x: Int, y: Int): Int = {
if (x == 0) y
else if (x < 0) gcd(-x,y)
else if (y < 0) -gcd(x, -y)
else gcd(y % x, x)
}
private val g = gcd(n, d)

val numer: Int = n/g
val denom: Int = d/g

def +(that: Rational) =
new Rational(numer * that.denom + that.numer * denom,
denom * that.denom)

def -(that: Rational) =
new Rational(numer * that.denom - that.numer * denom,
denom * that.denom)

def *(that: Rational) =
new Rational(numer * that.numer, denom * that.denom)

def /(that: Rational) =
new Rational(numer * that.denom, denom * that.numer)


override def toString = "" + numer + "/" + denom + ""

def square = new Rational(numer*numer, denom*denom)
}

and I wanted an extractor which would behave the following way:

val r = new Rational(2, 3)  

r match {
  case num / denom => num + "/" + denom
}   

This is absolutely feasible in Scala given the flexibility in the names of identifiers. Given this an extractor called "/" can be defined the following way:

object / {
 def unapply(r: Rational): Option[(Int, Int)] = Some(r.numer, r.denom)
}

and used for extracting the numerator and denominator of the rational number!

r match {
 case /(num, denom) => num + "/" + denom
} 

//using infix
r match {
 case num / denom => num + "/" + denom
}                             

Sunday, October 11, 2015

Gentle Introduction to Hystrix - Motivation

In the last few days I have been exploring the Netflix Hystrix library and have come to appreciate the features provided by this excellent library.

To quote from the Hystrix site:

Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems, services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems where failure is inevitable.

There are a whole lot of keywords to parse here, however the best way to experience Hystrix in my mind is to try out a sample use case.


An unpredictable Service

Consider a service, an odd one, which takes a json message of the following structure and returns an acknowledgement:


{
    "id":"1",
    "payload": "Sample Payload",
    "throw_exception":false,
    "delay_by": 0
}

The service takes in a payload, but additionally takes in two fields - delay_by which makes the service acknowledge a response after the delay in milliseconds and a "throw_exceptions" field which will result in an exception after the specified delay!

Here is a sample response:

{
 "id":"1",
 "received":"Sample Payload",
 "payload":"Reply Message"
}


If you are following along, here is my github repo with this sample, I have used Netflix Karyon 2 for this sample and the code which handles the request can be expressed very concisely the following way - see how the rx-java library is being put to good use here:

import com.netflix.governator.annotations.Configuration;
import rx.Observable;
import service1.domain.Message;
import service1.domain.MessageAcknowledgement;

import java.util.concurrent.TimeUnit;

public class MessageHandlerServiceImpl implements MessageHandlerService {

    @Configuration("reply.message")
    private String replyMessage;

    public Observable<MessageAcknowledgement> handleMessage(Message message) {
        logger.info("About to Acknowledge");
        return Observable.timer(message.getDelayBy(), TimeUnit.MILLISECONDS)
                .map(l -> message.isThrowException())
                .map(throwException -> {
                    if (throwException) {
                        throw new RuntimeException("Throwing an exception!");
                    }
                    return new MessageAcknowledgement(message.getId(), message.getPayload(), replyMessage);
                });
    }


}


At this point we have a good candidate service which can be made to respond with an arbitrary delay and failure.


A client to the Service

Now onto a client to this service. I am using Netflix Feign to make this call, yet another awesome library, all it requires is a java interface annotated the following way:

package aggregate.service;

import aggregate.domain.Message;
import aggregate.domain.MessageAcknowledgement;
import feign.RequestLine;

public interface RemoteCallService {
    @RequestLine("POST /message")
    MessageAcknowledgement handleMessage(Message message);
}

It creates the necessary proxy implementing this interface using configuration along these lines:

RemoteCallService remoteCallService = Feign.builder()
        .encoder(new JacksonEncoder())
        .decoder(new JacksonDecoder())
        .target(RemoteCallService.class, "http://127.0.0.1:8889");


I have multiple endpoints which delegate calls to this remote client, all of them expose a url pattern along these lines - http://localhost:8888/noHystrix?message=Hello&delay_by=0&throw_exception=false, this first one is an example where the endpoint does not use Hystrix.



No Hystrix Case


As a first example, consider calls to the Remote service without Hystrix, if I were to try a call to http://localhost:8888/noHystrix?message=Hello&delay_by=5000&throw_exception=false or say to http://localhost:8888/noHystrix?message=Hello&delay_by=5000&throw_exception=true, in both instances the user request to the endpoints will simply hang for 5 seconds before responding.


There should be a few things immediately apparent here:

1. If the service responds slowly, then the client requests to the service will be forced to wait for the response to come back.

2. Under heavy load it is very likely that all threads handling user traffic will be exhausted, thus failing further user requests.

2. If the service were to throw an exception, the client does not handle it gracefully.

Clearly there is a need for something like Hystrix which handles all these issues.

Hystrix command wrapping Remote calls


I conducted a small load test using a 50 user load on the previous case and got a result along these lines:

================================================================================
---- Global Information --------------------------------------------------------
> request count                                         50 (OK=50     KO=0     )
> min response time                                   5007 (OK=5007   KO=-     )
> max response time                                  34088 (OK=34088  KO=-     )
> mean response time                                 17797 (OK=17797  KO=-     )
> std deviation                                       8760 (OK=8760   KO=-     )
> response time 50th percentile                      19532 (OK=19532  KO=-     )
> response time 75th percentile                      24386 (OK=24386  KO=-     )
> mean requests/sec                                  1.425 (OK=1.425  KO=-     )

Essentially a 5 second delay from the service results in a 75th percentile time of 25 seconds!, now consider the same test with Hystrix command wrapping the service calls:

================================================================================
---- Global Information --------------------------------------------------------
> request count                                         50 (OK=50     KO=0     )
> min response time                                      1 (OK=1      KO=-     )
> max response time                                   1014 (OK=1014   KO=-     )
> mean response time                                    22 (OK=22     KO=-     )
> std deviation                                        141 (OK=141    KO=-     )
> response time 50th percentile                          2 (OK=2      KO=-     )
> response time 75th percentile                          2 (OK=2      KO=-     )
> mean requests/sec                                 48.123 (OK=48.123 KO=-     )

Strangely the 75th percentile time now is 2 millseconds!, how is this possible, and the answer becomes obvious using the excellent tools that Hystrix provides, here is a Hystrix dashboard view for this test:



What happened here is that the first 10 requests timed out, anything more than a second by default times out with Hystrix command in place, once the first ten transactions failed Hystrix short circuited the command thus blocking anymore requests to the remote service and hence the low response time. On why these transactions were not showing up as failed, this is because there is a fallback in place here which responds to the user request gracefully on failure.

Conclusion

The purpose here was to set the motivation for why a library like Hystrix is required, I will follow this up with the specifics of what is needed to integrate Hystrix into an application and the breadth of features that this excellent library provides.