Friday, April 21, 2017

Spring Web-Flux - Functional Style with Cassandra Backend

In a previous post I had walked through the basics of Spring Web-Flux which denotes the reactive support in the web layer of Spring framework.

I had demonstrated an end to end sample using Spring Data Cassandra and using the traditional annotations support in the Spring Web Layers, along these lines:

...
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
...

@RestController
@RequestMapping("/hotels")
public class HotelController {

    @GetMapping(path = "/{id}")
    public Mono<Hotel> get(@PathVariable("id") UUID uuid) {
        ...
    }

    @GetMapping(path = "/startingwith/{letter}")
    public Flux<HotelByLetter> findHotelsWithLetter(
            @PathVariable("letter") String letter) {
        ...
    }

}

This looks like the traditional Spring Web annotations except for the return types, instead of returning the domain types these endpoints are returning the Publisher type via the implementations of Mono and Flux in reactor-core and Spring-Web handles streaming the content back.


In this post I will cover a different way of exposing the endpoints - using a functional style instead of the annotations style. Let me acknowledge that I have found Baeldung's article and Rossen Stoyanchev's post invaluable in my understanding of the functional style of exposing the web endpoints.


Mapping the annotations to routes

Let me start with a few annotation based endpoints, one to retrieve an entity and one to save an entity:

@GetMapping(path = "/{id}")
public Mono<Hotel> get(@PathVariable("id") UUID uuid) {
    return this.hotelService.findOne(uuid);
}

@PostMapping
public Mono<ResponseEntity<Hotel>> save(@RequestBody Hotel hotel) {
    return this.hotelService.save(hotel)
            .map(savedHotel -> new ResponseEntity<>(savedHotel, HttpStatus.CREATED));
}


In a functional style of exposing the endpoints, each of the endpoints would translate to a RouterFunction, and they can composed to create all the endpoints of the app, along these lines:

package cass.web;

import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RouterFunction;

import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.*;

public interface ApplicationRoutes {
    static RouterFunction<?> routes(HotelHandler hotelHandler) {
        return nest(path("/hotels"),
                nest(accept(MediaType.APPLICATION_JSON),
                        route(GET("/{id}"), hotelHandler::get)
                                .andRoute(POST("/"), hotelHandler::save)
                ));
    }
}


There are helper functions(nest, route, GET, accept etc) which make composing all the RouterFunction(s) together a breeze. Once an appropriate RouterFunction is found, the request is handled by a HandlerFunction which in the above sample is abstracted by the HotelHandler and for the save and get functionality looks like this:

import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.UUID;

@Service
public class HotelHandler {

    ...
    
    public Mono<ServerResponse> get(ServerRequest request) {
        UUID uuid = UUID.fromString(request.pathVariable("id"));
        Mono<ServerResponse> notFound = ServerResponse.notFound().build();
        return this.hotelService.findOne(uuid)
                .flatMap(hotel -> ServerResponse.ok().body(Mono.just(hotel), Hotel.class))
                .switchIfEmpty(notFound);
    }

    public Mono<ServerResponse> save(ServerRequest serverRequest) {
        Mono<Hotel> hotelToBeCreated = serverRequest.bodyToMono(Hotel.class);
        return hotelToBeCreated.flatMap(hotel ->
                ServerResponse.status(HttpStatus.CREATED).body(hotelService.save(hotel), Hotel.class)
        );
    }

    ...
}    


This is how a complete RouterFunction for all the API's supported by the original annotation based project looks like:

import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RouterFunction;

import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.*;

public interface ApplicationRoutes {
    static RouterFunction<?> routes(HotelHandler hotelHandler) {
        return nest(path("/hotels"),
                nest(accept(MediaType.APPLICATION_JSON),
                        route(GET("/{id}"), hotelHandler::get)
                                .andRoute(POST("/"), hotelHandler::save)
                                .andRoute(PUT("/"), hotelHandler::update)
                                .andRoute(DELETE("/{id}"), hotelHandler::delete)
                                .andRoute(GET("/startingwith/{letter}"), hotelHandler::findHotelsWithLetter)
                                .andRoute(GET("/fromstate/{state}"), hotelHandler::findHotelsInState)
                ));
    }
}

Testing functional Routes

It is easy to test these routes also, Spring Webflux provides a WebTestClient to test out the routes while providing the ability to mock the implementations behind it

For eg, to test the get by id endpoint, I would bind the WebTestClient to the RouterFunction defined before and use the assertions that it provides to test the behavior.

import org.junit.Before;
import org.junit.Test;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Mono;

import java.util.UUID;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


public class GetRouteTests {

    private WebTestClient client;
    private HotelService hotelService;

    private UUID sampleUUID = UUID.fromString("fd28ec06-6de5-4f68-9353-59793a5bdec2");

    @Before
    public void setUp() {
        this.hotelService = mock(HotelService.class);
        when(hotelService.findOne(sampleUUID)).thenReturn(Mono.just(new Hotel(sampleUUID, "test")));
        HotelHandler hotelHandler = new HotelHandler(hotelService);
        
        this.client = WebTestClient.bindToRouterFunction(ApplicationRoutes.routes(hotelHandler)).build();
    }

    @Test
    public void testHotelGet() throws Exception {
        this.client.get().uri("/hotels/" + sampleUUID)
                .exchange()
                .expectStatus().isOk()
                .expectBody(Hotel.class)
                .isEqualTo(new Hotel(sampleUUID, "test"));
    }
}

Conclusion

The functional way of defining the routes is definitely a very different approach from the annotation based one - I like that it is a far more explicit way of defining an endpoint and how the calls for the endpoint is handled, the annotations always felt a little more magical.

I have a complete working code in my github repo which may be easier to follow than the code in this post.

Saturday, April 1, 2017

Hystrix Command - Java 8 helpers

Let me start by acknowledging that what I am posting here is far from original, it is inspired by the post here by Demian Neidetcher which was further adapted by two of my former colleagues - Alexey Dmitrovsky1(T-Mobile) and Pavel Orda(Altoros).


Motivation

So the motivation is fairly simple, consider two remote calls the result of which is aggregated in some way:

String  r1 = remoteCall1();
Integer r2 = remoteCall2();

String aggregated = r1 + r2;
assertThat(aggregated).isEqualTo("result1");

Ideally you would want the remote calls to be protected by the excellent Hystrix library, what if I could do it along these lines:

String  r1 = execute("remote1", "remote1", () -> remoteCall1());
Integer r2 = execute("remote2", "remote2", () -> remoteCall2());

String aggregated = r1 + r2;
assertThat(aggregated).isEqualTo("result1");

I have avoided all the boiler plate around needing to define an explicit HystrixCommand around each of my remote calls this way, and instead wrapped the remote calls using a Java 8 lambda expression which resolves to a Supplier functional interface

Even better, a variation of this allows me to aggregate the results in a reactive way by returning an Rx-java Observable instead:

Observable<String>  r1Obs = executeObservable("remote1", "remote1", () -> remoteCall1());
Observable<Integer> r2Obs = executeObservable("remote2", "remote2", () -> remoteCall2());

String aggregated = Observable.zip(r1Obs, r2Obs, (r1, r2) -> (r1 + r2)).toBlocking().single();

assertThat(aggregated).isEqualTo("result1");

What about fallbacks, I can support it by taking in another lambda expression which transforms an exception to a reasonable fallback(and logs the exception in the process):


Observable<String> r1Obs = executeObservable("remote1", "remote1",
        () -> {
            throw new RuntimeException("!!");
        },
        (t) -> {
            logger.error(t.getMessage(), t);
            return "fallback";
        });
Observable<Integer> r2Obs = executeObservable("remote2", "remote2",
        () -> {
            throw new RuntimeException("!!");
        },
        (t) -> {
            logger.error(t.getMessage(), t);
            return 0;
        });

String aggregated = Observable.zip(r1Obs, r2Obs, (r1, r2) -> (r1 + r2)).toBlocking().single();

assertThat(aggregated).isEqualTo("fallback0");


Implementation


The implementation is fairly simple and in its entirety is the following:

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import rx.Observable;

import java.util.function.Function;
import java.util.function.Supplier;

public class GenericHystrixCommand<T> extends HystrixCommand<T> {

    private Supplier<T> toRun;

    private Function<Throwable, T> fallback;


    public static <T> T execute(String groupKey, String commandkey, Supplier<T> toRun) {
        return execute(groupKey, commandkey, toRun, null);
    }

    public static <T> T execute(String groupKey, String commandkey, 
               Supplier<T> toRun, Function<Throwable, T> fallback) {
        return new GenericHystrixCommand<>(groupKey, commandkey, toRun, fallback).execute();
    }

    public static <T> Observable<T> executeObservable(String groupKey, String commandkey, 
               Supplier<T> toRun) {
        return executeObservable(groupKey, commandkey, toRun, null);
    }

    public static <T> Observable<T> executeObservable(String groupKey, String commandkey, 
               Supplier<T> toRun, Function<Throwable, T> fallback) {
        return new GenericHystrixCommand<>(groupKey, commandkey, toRun, fallback)
                .toObservable();
    }

    public GenericHystrixCommand(String groupKey, String commandkey, 
               Supplier<T> toRun, Function<Throwable, T> fallback) {
        super(Setter
                .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
                .andCommandKey(HystrixCommandKey.Factory.asKey(commandkey)));
        this.toRun = toRun;
        this.fallback = fallback;
    }

    protected T run() throws Exception {
        return this.toRun.get();
    }

    @Override
    protected T getFallback() {
        return (this.fallback != null)
                ? this.fallback.apply(getExecutionException())
                : super.getFallback();
    }
}


All it does is to take in the code that needs to be wrapped as a Java8 Supplier and the fallback as a Java 8 Function


If you are interested in playing with this pattern, I have a little more fleshed out sample here in my github repo.