package org.bk.samplepong.app; ..... public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> { private final String healthCheckUri; private final HealthCheckEndpoint healthCheckEndpoint; private final ObjectMapper objectMapper = new ObjectMapper(); public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) { this.healthCheckUri = healthCheckUri; this.healthCheckEndpoint = healthCheckEndpoint; } @Override public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) { if (request.getUri().startsWith(healthCheckUri)) { return healthCheckEndpoint.handle(request, response); } else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) { return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8"))) .map(s -> { try { Message m = objectMapper.readValue(s, Message.class); return m; } catch (IOException e) { throw new RuntimeException(e); } }) .map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong")) .flatMap(ack -> { try { return response.writeStringAndFlush(objectMapper.writeValueAsString(ack)); } catch (Exception e) { response.setStatus(HttpResponseStatus.BAD_REQUEST); return response.close(); } } ); } else { response.setStatus(HttpResponseStatus.NOT_FOUND); return response.close(); } } }
The issues are:
- The routing logic is not centralized, the request handler has both the routing logic and the processing logic
- The dependencies are not injected in cleanly.
Looking at the Karyon2 samples, both of these issues are actually very cleanly addressed now which I wanted to document here.
Routing
Routing can be centralized using a custom Rx-netty RequestHandler called the SimpleUriRouter
The routes can be registered the following way using SimpleRouter which is being created here using a Guice Provider:import com.google.inject.Inject; import com.google.inject.Provider; import io.netty.buffer.ByteBuf; import netflix.karyon.health.HealthCheckHandler; import netflix.karyon.transport.http.SimpleUriRouter; import netflix.karyon.transport.http.health.HealthCheckEndpoint; import org.bk.samplepong.app.ApplicationMessageHandler; import org.bk.samplepong.common.health.HealthCheck; public class AppRouteProvider implements Provider<SimpleUriRouter<ByteBuf, ByteBuf>> { @Inject private HealthCheck healthCheck; @Inject private ApplicationMessageHandler applicationMessageHandler; @Override public SimpleUriRouter get() { SimpleUriRouter simpleUriRouter = new SimpleUriRouter(); simpleUriRouter.addUri("/healthcheck", new HealthCheckEndpoint(healthCheck)); simpleUriRouter.addUri("/message", applicationMessageHandler); return simpleUriRouter; } }
This router can now be registered via a custom guice module the following way:
public class KaryonAppModule extends KaryonHttpModule<ByteBuf, ByteBuf> { public KaryonAppModule() { super("routerModule", ByteBuf.class, ByteBuf.class); } @Override protected void configureServer() { bindRouter().toProvider(new AppRouteProvider()); interceptorSupport().forUri("/*").intercept(LoggingInterceptor.class); server().port(8888); } }
This is essentially it, now the routing logic is cleanly separated from the processing logic.
Dependency Injection
Dependency injection is handled via custom guice modules. I have a service, call it the MessageHandlerService, which takes in a message and returns an Acknowledgement, this service is defined as follows:
public class MessageHandlerServiceImpl implements MessageHandlerService { private static final Logger logger = LoggerFactory.getLogger(MessageHandlerServiceImpl.class); public Observable<MessageAcknowledgement> handleMessage(Message message) { return Observable.<MessageAcknowledgement>create(s -> { s.onNext(new MessageAcknowledgement(message.getId(), message.getPayload(), "Pong")); s.onCompleted(); }); } }
Now, I have a guice module which specifies the binding between MessageHandlerService interface and the concrete MessageHandlerServiceImpl:
public class AppModule extends AbstractModule { @Override protected void configure() { bind(MessageHandlerService.class).to(MessageHandlerServiceImpl.class).in(Scopes.SINGLETON); } }
With this in place, the MessageHandlerService can be injected in:
public class ApplicationMessageHandler implements RequestHandler<ByteBuf, ByteBuf> { private final ObjectMapper objectMapper = new ObjectMapper(); private final MessageHandlerService messageHandlerService; @Inject public ApplicationMessageHandler(MessageHandlerService messageHandlerService) { this.messageHandlerService = messageHandlerService; } @Override public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) { return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8"))) .map(s -> { try { Message m = objectMapper.readValue(s, Message.class); return m; } catch (IOException e) { throw new RuntimeException(e); } }) .flatMap(messageHandlerService::handleMessage) .flatMap(ack -> { try { return response.writeStringAndFlush(objectMapper.writeValueAsString(ack)); } catch (Exception e) { response.setStatus(HttpResponseStatus.BAD_REQUEST); return response.close(); } } ); } }
With both these features implemented, the app using Karyon2 is also greatly simplified and I have the complete working app in my github repository here: https://github.com/bijukunjummen/sample-ping-pong-netflixoss/tree/master/sample-pong
No comments:
Post a Comment