Saturday, April 1, 2017

Hystrix Command - Java 8 helpers

Let me start by acknowledging that what I am posting here is far from original, it is inspired by the post here by Demian Neidetcher which was further adapted by two of my former colleagues - Alexey Dmitrovsky1(T-Mobile) and Pavel Orda(Altoros).


Motivation

So the motivation is fairly simple, consider two remote calls the result of which is aggregated in some way:

String  r1 = remoteCall1();
Integer r2 = remoteCall2();

String aggregated = r1 + r2;
assertThat(aggregated).isEqualTo("result1");

Ideally you would want the remote calls to be protected by the excellent Hystrix library, what if I could do it along these lines:

String  r1 = execute("remote1", "remote1", () -> remoteCall1());
Integer r2 = execute("remote2", "remote2", () -> remoteCall2());

String aggregated = r1 + r2;
assertThat(aggregated).isEqualTo("result1");

I have avoided all the boiler plate around needing to define an explicit HystrixCommand around each of my remote calls this way, and instead wrapped the remote calls using a Java 8 lambda expression which resolves to a Supplier functional interface

Even better, a variation of this allows me to aggregate the results in a reactive way by returning an Rx-java Observable instead:

Observable<String>  r1Obs = executeObservable("remote1", "remote1", () -> remoteCall1());
Observable<Integer> r2Obs = executeObservable("remote2", "remote2", () -> remoteCall2());

String aggregated = Observable.zip(r1Obs, r2Obs, (r1, r2) -> (r1 + r2)).toBlocking().single();

assertThat(aggregated).isEqualTo("result1");

What about fallbacks, I can support it by taking in another lambda expression which transforms an exception to a reasonable fallback(and logs the exception in the process):


Observable<String> r1Obs = executeObservable("remote1", "remote1",
        () -> {
            throw new RuntimeException("!!");
        },
        (t) -> {
            logger.error(t.getMessage(), t);
            return "fallback";
        });
Observable<Integer> r2Obs = executeObservable("remote2", "remote2",
        () -> {
            throw new RuntimeException("!!");
        },
        (t) -> {
            logger.error(t.getMessage(), t);
            return 0;
        });

String aggregated = Observable.zip(r1Obs, r2Obs, (r1, r2) -> (r1 + r2)).toBlocking().single();

assertThat(aggregated).isEqualTo("fallback0");


Implementation


The implementation is fairly simple and in its entirety is the following:

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import rx.Observable;

import java.util.function.Function;
import java.util.function.Supplier;

public class GenericHystrixCommand<T> extends HystrixCommand<T> {

    private Supplier<T> toRun;

    private Function<Throwable, T> fallback;


    public static <T> T execute(String groupKey, String commandkey, Supplier<T> toRun) {
        return execute(groupKey, commandkey, toRun, null);
    }

    public static <T> T execute(String groupKey, String commandkey, 
               Supplier<T> toRun, Function<Throwable, T> fallback) {
        return new GenericHystrixCommand<>(groupKey, commandkey, toRun, fallback).execute();
    }

    public static <T> Observable<T> executeObservable(String groupKey, String commandkey, 
               Supplier<T> toRun) {
        return executeObservable(groupKey, commandkey, toRun, null);
    }

    public static <T> Observable<T> executeObservable(String groupKey, String commandkey, 
               Supplier<T> toRun, Function<Throwable, T> fallback) {
        return new GenericHystrixCommand<>(groupKey, commandkey, toRun, fallback)
                .toObservable();
    }

    public GenericHystrixCommand(String groupKey, String commandkey, 
               Supplier<T> toRun, Function<Throwable, T> fallback) {
        super(Setter
                .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
                .andCommandKey(HystrixCommandKey.Factory.asKey(commandkey)));
        this.toRun = toRun;
        this.fallback = fallback;
    }

    protected T run() throws Exception {
        return this.toRun.get();
    }

    @Override
    protected T getFallback() {
        return (this.fallback != null)
                ? this.fallback.apply(getExecutionException())
                : super.getFallback();
    }
}


All it does is to take in the code that needs to be wrapped as a Java8 Supplier and the fallback as a Java 8 Function


If you are interested in playing with this pattern, I have a little more fleshed out sample here in my github repo.

No comments:

Post a Comment