Friday, July 10, 2015

Rx-netty and Karyon2 based cloud ready microservice - Dependency Injection

I had previously written about using Rx-netty and Karyon2 for developing cloud ready microservices, there were a few issues with the sample though, partly reproduced here:

package org.bk.samplepong.app;

.....

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

    private final String healthCheckUri;
    private final HealthCheckEndpoint healthCheckEndpoint;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
        this.healthCheckUri = healthCheckUri;
        this.healthCheckEndpoint = healthCheckEndpoint;
    }

    @Override
    public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
        if (request.getUri().startsWith(healthCheckUri)) {
            return healthCheckEndpoint.handle(request, response);
        } else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
            return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
                    .map(s -> {
                        try {
                            Message m = objectMapper.readValue(s, Message.class);
                            return m;
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    })
                    .map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                    .flatMap(ack -> {
                                try {
                                    return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                                } catch (Exception e) {
                                    response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                    return response.close();
                                }
                            }
                    );
        } else {
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return response.close();
        }
    }
}

The issues are:

  1. The routing logic is not centralized, the request handler has both the routing logic and the processing logic
  2. The dependencies are not injected in cleanly.


Looking at the Karyon2 samples, both of these issues are actually very cleanly addressed now which I wanted to document here.

Routing

Routing can be centralized using a custom Rx-netty RequestHandler called the SimpleUriRouter
The routes can be registered the following way using SimpleRouter which is being created here using a Guice Provider:

import com.google.inject.Inject;
import com.google.inject.Provider;
import io.netty.buffer.ByteBuf;
import netflix.karyon.health.HealthCheckHandler;
import netflix.karyon.transport.http.SimpleUriRouter;
import netflix.karyon.transport.http.health.HealthCheckEndpoint;
import org.bk.samplepong.app.ApplicationMessageHandler;
import org.bk.samplepong.common.health.HealthCheck;

public class AppRouteProvider implements Provider<SimpleUriRouter<ByteBuf, ByteBuf>> {

    @Inject
    private HealthCheck healthCheck;

    @Inject
    private ApplicationMessageHandler applicationMessageHandler;

    @Override
    public SimpleUriRouter get() {
        SimpleUriRouter simpleUriRouter = new SimpleUriRouter();
        simpleUriRouter.addUri("/healthcheck", new HealthCheckEndpoint(healthCheck));
        simpleUriRouter.addUri("/message", applicationMessageHandler);
        return simpleUriRouter;
    }
}

This router can now be registered via a custom guice module the following way:

public class KaryonAppModule extends KaryonHttpModule<ByteBuf, ByteBuf> {

    public KaryonAppModule() {
        super("routerModule", ByteBuf.class, ByteBuf.class);
    }

    @Override
    protected void configureServer() {
        bindRouter().toProvider(new AppRouteProvider());

        interceptorSupport().forUri("/*").intercept(LoggingInterceptor.class);

        server().port(8888);
    }
}

This is essentially it, now the routing logic is cleanly separated from the processing logic.

Dependency Injection


Dependency injection is handled via custom guice modules. I have a service, call it the MessageHandlerService, which takes in a message and returns an Acknowledgement, this service is defined as follows:

public class MessageHandlerServiceImpl implements MessageHandlerService {
    private static final Logger logger = LoggerFactory.getLogger(MessageHandlerServiceImpl.class);

    public Observable<MessageAcknowledgement> handleMessage(Message message) {
        return Observable.<MessageAcknowledgement>create(s -> {
            s.onNext(new MessageAcknowledgement(message.getId(), message.getPayload(), "Pong"));
            s.onCompleted();
        });
    }


}

Now, I have a guice module which specifies the binding between MessageHandlerService interface and the concrete MessageHandlerServiceImpl:

public class AppModule extends AbstractModule {


    @Override
    protected void configure() {
        bind(MessageHandlerService.class).to(MessageHandlerServiceImpl.class).in(Scopes.SINGLETON);
    }
}


With this in place, the MessageHandlerService can be injected in:

public class ApplicationMessageHandler implements RequestHandler<ByteBuf, ByteBuf> {

    private final ObjectMapper objectMapper = new ObjectMapper();

    private final MessageHandlerService messageHandlerService;

    @Inject
    public ApplicationMessageHandler(MessageHandlerService messageHandlerService) {
        this.messageHandlerService = messageHandlerService;
    }

    @Override
    public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
        return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
                .map(s -> {
                    try {
                        Message m = objectMapper.readValue(s, Message.class);
                        return m;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .flatMap(messageHandlerService::handleMessage)
                .flatMap(ack -> {
                            try {
                                return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                            } catch (Exception e) {
                                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                return response.close();
                            }
                        }
                );
    }
}


With both these features implemented, the app using Karyon2 is also greatly simplified and I have the complete working app in my github repository here: https://github.com/bijukunjummen/sample-ping-pong-netflixoss/tree/master/sample-pong

No comments:

Post a Comment