Motivation
So the motivation is fairly simple, consider two remote calls the result of which is aggregated in some way:String r1 = remoteCall1(); Integer r2 = remoteCall2(); String aggregated = r1 + r2; assertThat(aggregated).isEqualTo("result1");
Ideally you would want the remote calls to be protected by the excellent Hystrix library, what if I could do it along these lines:
String r1 = execute("remote1", "remote1", () -> remoteCall1()); Integer r2 = execute("remote2", "remote2", () -> remoteCall2()); String aggregated = r1 + r2; assertThat(aggregated).isEqualTo("result1");
I have avoided all the boiler plate around needing to define an explicit HystrixCommand around each of my remote calls this way, and instead wrapped the remote calls using a Java 8 lambda expression which resolves to a Supplier functional interface
Even better, a variation of this allows me to aggregate the results in a reactive way by returning an Rx-java Observable instead:
Observable<String> r1Obs = executeObservable("remote1", "remote1", () -> remoteCall1()); Observable<Integer> r2Obs = executeObservable("remote2", "remote2", () -> remoteCall2()); String aggregated = Observable.zip(r1Obs, r2Obs, (r1, r2) -> (r1 + r2)).toBlocking().single(); assertThat(aggregated).isEqualTo("result1");
What about fallbacks, I can support it by taking in another lambda expression which transforms an exception to a reasonable fallback(and logs the exception in the process):
Observable<String> r1Obs = executeObservable("remote1", "remote1", () -> { throw new RuntimeException("!!"); }, (t) -> { logger.error(t.getMessage(), t); return "fallback"; }); Observable<Integer> r2Obs = executeObservable("remote2", "remote2", () -> { throw new RuntimeException("!!"); }, (t) -> { logger.error(t.getMessage(), t); return 0; }); String aggregated = Observable.zip(r1Obs, r2Obs, (r1, r2) -> (r1 + r2)).toBlocking().single(); assertThat(aggregated).isEqualTo("fallback0");
Implementation
The implementation is fairly simple and in its entirety is the following:
import com.netflix.hystrix.HystrixCommand; import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixCommandKey; import rx.Observable; import java.util.function.Function; import java.util.function.Supplier; public class GenericHystrixCommand<T> extends HystrixCommand<T> { private Supplier<T> toRun; private Function<Throwable, T> fallback; public static <T> T execute(String groupKey, String commandkey, Supplier<T> toRun) { return execute(groupKey, commandkey, toRun, null); } public static <T> T execute(String groupKey, String commandkey, Supplier<T> toRun, Function<Throwable, T> fallback) { return new GenericHystrixCommand<>(groupKey, commandkey, toRun, fallback).execute(); } public static <T> Observable<T> executeObservable(String groupKey, String commandkey, Supplier<T> toRun) { return executeObservable(groupKey, commandkey, toRun, null); } public static <T> Observable<T> executeObservable(String groupKey, String commandkey, Supplier<T> toRun, Function<Throwable, T> fallback) { return new GenericHystrixCommand<>(groupKey, commandkey, toRun, fallback) .toObservable(); } public GenericHystrixCommand(String groupKey, String commandkey, Supplier<T> toRun, Function<Throwable, T> fallback) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey)) .andCommandKey(HystrixCommandKey.Factory.asKey(commandkey))); this.toRun = toRun; this.fallback = fallback; } protected T run() throws Exception { return this.toRun.get(); } @Override protected T getFallback() { return (this.fallback != null) ? this.fallback.apply(getExecutionException()) : super.getFallback(); } }
All it does is to take in the code that needs to be wrapped as a Java8 Supplier and the fallback as a Java 8 Function
If you are interested in playing with this pattern, I have a little more fleshed out sample here in my github repo.
No comments:
Post a Comment