Tuesday, July 28, 2015

Spring Boot @ConfigurationProperties

Spring Boot provides a very neat way to load properties for an application. Consider a set of properties described using yaml format:

prefix:
    stringProp1: propValue1
    stringProp2: propValue2
    intProp1: 10
    listProp:
        - listValue1
        - listValue2
    mapProp:
        key1: mapValue1
        key2: mapValue2

These entries can also be described in a traditional application.properties file the following way:

prefix.stringProp1=propValue1
prefix.stringProp2=propValue2
prefix.intProp1=10
prefix.listProp[0]=listValue1
prefix.listProp[1]=listValue2
prefix.mapProp.key1=mapValue1
prefix.mapProp.key2=mapValue2

It has taken me a little while, but I do like the hierarchical look of the properties described in a yaml format.

So now, given this property file a traditional Spring application would have loaded up the properties the following way:

public class SamplePropertyLoadingTest {
    @Value("${prefix.stringProp1}")
    private String stringProp1;

Note the placeholder for "prefix.stringProp" key.

This however is not ideal for loading a family of related properties, say in this specific case namespaced by the prefix conveniently named "prefix".

The approach Spring boot takes is to define a bean that can hold all the family of related properties this way:

@ConfigurationProperties(prefix = "prefix")
@Component
public class SampleProperty {
    private String stringProp1;
    private String stringProp2;
    @Max(99)
    @Min(0)
    private Integer intProp1;
    private List<String> listProp;
    private Map<String, String> mapProp;
    
    ...
}

At runtime, all the fields would be bound to the related properties cleanly.

Additionally note the JSR-303 annotations on top of the "intProp1" field that validates that value of the field is between 0 and 99, @ConfigurationProperties will call the validator to ensure that bound bean is validated.

An integration test making use of this is the following:

package prop;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = SampleWebApplication.class)
public class SamplePropertyLoadingTest {
    @Autowired
    private SampleProperty sampleProperty;

    @Value("${prefix.stringProp1}")
    private String stringProp1;

    @Test
    public void testLoadingOfProperties() {
        System.out.println("stringProp1 = " + stringProp1);
        assertThat(sampleProperty.getStringProp1(), equalTo("propValue1"));
        assertThat(sampleProperty.getStringProp2(), equalTo("propValue2"));
        assertThat(sampleProperty.getIntProp1(), equalTo(10));
        assertThat(sampleProperty.getListProp(), hasItems("listValue1", "listValue2"));
        assertThat(sampleProperty.getMapProp(), allOf(hasEntry("key1", "mapValue1"),
                hasEntry("key2", "mapValue2")));
    }
}

If you are interested in exploring this sample further, I have a github repo with the code checked in here.

Thursday, July 23, 2015

Scatter Gather - Using Java 8 CompletableFuture and Rx-Java Observable

I wanted to explore a simple scatter-gather scenario using Java 8 CompletableFuture and using Rx-Java Observable.


The scenario is simple - Spawn about 10 tasks, each returning a string, and ultimately collect the results into a list.

Sequential

A sequential version of this would be the following:

public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

 logger.info(list.toString());
}

private String generateTask(int i) {
 Util.delay(2000);
 return i + "-" + "test";
}

With CompletableFuture

A method can be made to return a CompletableFuture using a utility method called supplyAsync, I am using a variation of this method which accepts an explicit Executor to use, also I am deliberately throwing an exception for one of the inputs:

private CompletableFuture<String> generateTask(int i,
  ExecutorService executorService) {
 return CompletableFuture.supplyAsync(() -> {
  Util.delay(2000);
  if (i == 5) {
   throw new RuntimeException("Run, it is a 5!");
  }
  return i + "-" + "test";
 }, executorService);
}

Now to scatter the tasks:

List<CompletableFuture<String>> futures =
  IntStream.range(0, 10)
    .boxed()
    .map(i -> this.generateTask(i, executors).exceptionally(t -> t.getMessage()))
    .collect(Collectors.toList());

At the end of scattering the tasks the result is a list of CompletableFuture. Now, to obtain the list of String from this is a little tricky, here I am using one of the solutions suggested in Stackoverflow:

CompletableFuture<List<String>> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
  .thenApply(v -> futures.stream()
       .map(CompletableFuture::join)
       .collect(Collectors.toList()));

CompletableFuture.allOf method is being used here purely to compose the next action to take once all the scattered tasks are completed, once the tasks are completed the futures are again streamed and collected into a list of string.

The final result can then be presented asynchronously:
result.thenAccept(l -> {
 logger.info(l.toString());
});


With Rx-java Observable

Scatter gather with Rx-java is relatively cleaner than the CompletableFuture version as Rx-java provides better ways to compose the results together, again the method which performs the scattered task:

private Observable<String> generateTask(int i, ExecutorService executorService) {
    return Observable
            .<String>create(s -> {
                Util.delay(2000);
                if ( i == 5) {
                    throw new RuntimeException("Run, it is a 5!");
                }
                s.onNext( i + "-test");
                s.onCompleted();
            }).onErrorReturn(e -> e.getMessage()).subscribeOn(Schedulers.from(executorService));
}

and to scatter the tasks:

List<Observable<String>> obs =
        IntStream.range(0, 10)
            .boxed()
            .map(i -> generateTask(i, executors)).collect(Collectors.toList());

Once more I have a List of Observable's, and what I need is a List of results, Observable provides a merge method to do just that:

Observable<List<String>> merged = Observable.merge(obs).toList();

which can be subscribed to and the results printed when available:

merged.subscribe(
                l -> logger.info(l.toString()));

Friday, July 10, 2015

Rx-netty and Karyon2 based cloud ready microservice - Dependency Injection

I had previously written about using Rx-netty and Karyon2 for developing cloud ready microservices, there were a few issues with the sample though, partly reproduced here:

package org.bk.samplepong.app;

.....

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

    private final String healthCheckUri;
    private final HealthCheckEndpoint healthCheckEndpoint;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
        this.healthCheckUri = healthCheckUri;
        this.healthCheckEndpoint = healthCheckEndpoint;
    }

    @Override
    public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
        if (request.getUri().startsWith(healthCheckUri)) {
            return healthCheckEndpoint.handle(request, response);
        } else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
            return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
                    .map(s -> {
                        try {
                            Message m = objectMapper.readValue(s, Message.class);
                            return m;
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    })
                    .map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                    .flatMap(ack -> {
                                try {
                                    return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                                } catch (Exception e) {
                                    response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                    return response.close();
                                }
                            }
                    );
        } else {
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return response.close();
        }
    }
}

The issues are:

  1. The routing logic is not centralized, the request handler has both the routing logic and the processing logic
  2. The dependencies are not injected in cleanly.


Looking at the Karyon2 samples, both of these issues are actually very cleanly addressed now which I wanted to document here.

Routing

Routing can be centralized using a custom Rx-netty RequestHandler called the SimpleUriRouter
The routes can be registered the following way using SimpleRouter which is being created here using a Guice Provider:

import com.google.inject.Inject;
import com.google.inject.Provider;
import io.netty.buffer.ByteBuf;
import netflix.karyon.health.HealthCheckHandler;
import netflix.karyon.transport.http.SimpleUriRouter;
import netflix.karyon.transport.http.health.HealthCheckEndpoint;
import org.bk.samplepong.app.ApplicationMessageHandler;
import org.bk.samplepong.common.health.HealthCheck;

public class AppRouteProvider implements Provider<SimpleUriRouter<ByteBuf, ByteBuf>> {

    @Inject
    private HealthCheck healthCheck;

    @Inject
    private ApplicationMessageHandler applicationMessageHandler;

    @Override
    public SimpleUriRouter get() {
        SimpleUriRouter simpleUriRouter = new SimpleUriRouter();
        simpleUriRouter.addUri("/healthcheck", new HealthCheckEndpoint(healthCheck));
        simpleUriRouter.addUri("/message", applicationMessageHandler);
        return simpleUriRouter;
    }
}

This router can now be registered via a custom guice module the following way:

public class KaryonAppModule extends KaryonHttpModule<ByteBuf, ByteBuf> {

    public KaryonAppModule() {
        super("routerModule", ByteBuf.class, ByteBuf.class);
    }

    @Override
    protected void configureServer() {
        bindRouter().toProvider(new AppRouteProvider());

        interceptorSupport().forUri("/*").intercept(LoggingInterceptor.class);

        server().port(8888);
    }
}

This is essentially it, now the routing logic is cleanly separated from the processing logic.

Dependency Injection


Dependency injection is handled via custom guice modules. I have a service, call it the MessageHandlerService, which takes in a message and returns an Acknowledgement, this service is defined as follows:

public class MessageHandlerServiceImpl implements MessageHandlerService {
    private static final Logger logger = LoggerFactory.getLogger(MessageHandlerServiceImpl.class);

    public Observable<MessageAcknowledgement> handleMessage(Message message) {
        return Observable.<MessageAcknowledgement>create(s -> {
            s.onNext(new MessageAcknowledgement(message.getId(), message.getPayload(), "Pong"));
            s.onCompleted();
        });
    }


}

Now, I have a guice module which specifies the binding between MessageHandlerService interface and the concrete MessageHandlerServiceImpl:

public class AppModule extends AbstractModule {


    @Override
    protected void configure() {
        bind(MessageHandlerService.class).to(MessageHandlerServiceImpl.class).in(Scopes.SINGLETON);
    }
}


With this in place, the MessageHandlerService can be injected in:

public class ApplicationMessageHandler implements RequestHandler<ByteBuf, ByteBuf> {

    private final ObjectMapper objectMapper = new ObjectMapper();

    private final MessageHandlerService messageHandlerService;

    @Inject
    public ApplicationMessageHandler(MessageHandlerService messageHandlerService) {
        this.messageHandlerService = messageHandlerService;
    }

    @Override
    public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
        return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
                .map(s -> {
                    try {
                        Message m = objectMapper.readValue(s, Message.class);
                        return m;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .flatMap(messageHandlerService::handleMessage)
                .flatMap(ack -> {
                            try {
                                return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                            } catch (Exception e) {
                                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                return response.close();
                            }
                        }
                );
    }
}


With both these features implemented, the app using Karyon2 is also greatly simplified and I have the complete working app in my github repository here: https://github.com/bijukunjummen/sample-ping-pong-netflixoss/tree/master/sample-pong