package org.bk.samplepong.app;
.....
public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {
private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();
public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
this.healthCheckUri = healthCheckUri;
this.healthCheckEndpoint = healthCheckEndpoint;
}
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
if (request.getUri().startsWith(healthCheckUri)) {
return healthCheckEndpoint.handle(request, response);
} else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
.map(s -> {
try {
Message m = objectMapper.readValue(s, Message.class);
return m;
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
.flatMap(ack -> {
try {
return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
} catch (Exception e) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.close();
}
}
);
} else {
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}
}
The issues are:
- The routing logic is not centralized, the request handler has both the routing logic and the processing logic
- The dependencies are not injected in cleanly.
Looking at the Karyon2 samples, both of these issues are actually very cleanly addressed now which I wanted to document here.
Routing
Routing can be centralized using a custom Rx-netty RequestHandler called the SimpleUriRouter
The routes can be registered the following way using SimpleRouter which is being created here using a Guice Provider:import com.google.inject.Inject;
import com.google.inject.Provider;
import io.netty.buffer.ByteBuf;
import netflix.karyon.health.HealthCheckHandler;
import netflix.karyon.transport.http.SimpleUriRouter;
import netflix.karyon.transport.http.health.HealthCheckEndpoint;
import org.bk.samplepong.app.ApplicationMessageHandler;
import org.bk.samplepong.common.health.HealthCheck;
public class AppRouteProvider implements Provider<SimpleUriRouter<ByteBuf, ByteBuf>> {
@Inject
private HealthCheck healthCheck;
@Inject
private ApplicationMessageHandler applicationMessageHandler;
@Override
public SimpleUriRouter get() {
SimpleUriRouter simpleUriRouter = new SimpleUriRouter();
simpleUriRouter.addUri("/healthcheck", new HealthCheckEndpoint(healthCheck));
simpleUriRouter.addUri("/message", applicationMessageHandler);
return simpleUriRouter;
}
}
This router can now be registered via a custom guice module the following way:
public class KaryonAppModule extends KaryonHttpModule<ByteBuf, ByteBuf> {
public KaryonAppModule() {
super("routerModule", ByteBuf.class, ByteBuf.class);
}
@Override
protected void configureServer() {
bindRouter().toProvider(new AppRouteProvider());
interceptorSupport().forUri("/*").intercept(LoggingInterceptor.class);
server().port(8888);
}
}
This is essentially it, now the routing logic is cleanly separated from the processing logic.
Dependency Injection
Dependency injection is handled via custom guice modules. I have a service, call it the MessageHandlerService, which takes in a message and returns an Acknowledgement, this service is defined as follows:
public class MessageHandlerServiceImpl implements MessageHandlerService {
private static final Logger logger = LoggerFactory.getLogger(MessageHandlerServiceImpl.class);
public Observable<MessageAcknowledgement> handleMessage(Message message) {
return Observable.<MessageAcknowledgement>create(s -> {
s.onNext(new MessageAcknowledgement(message.getId(), message.getPayload(), "Pong"));
s.onCompleted();
});
}
}
Now, I have a guice module which specifies the binding between MessageHandlerService interface and the concrete MessageHandlerServiceImpl:
public class AppModule extends AbstractModule {
@Override
protected void configure() {
bind(MessageHandlerService.class).to(MessageHandlerServiceImpl.class).in(Scopes.SINGLETON);
}
}
With this in place, the MessageHandlerService can be injected in:
public class ApplicationMessageHandler implements RequestHandler<ByteBuf, ByteBuf> {
private final ObjectMapper objectMapper = new ObjectMapper();
private final MessageHandlerService messageHandlerService;
@Inject
public ApplicationMessageHandler(MessageHandlerService messageHandlerService) {
this.messageHandlerService = messageHandlerService;
}
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
.map(s -> {
try {
Message m = objectMapper.readValue(s, Message.class);
return m;
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.flatMap(messageHandlerService::handleMessage)
.flatMap(ack -> {
try {
return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
} catch (Exception e) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.close();
}
}
);
}
}
With both these features implemented, the app using Karyon2 is also greatly simplified and I have the complete working app in my github repository here: https://github.com/bijukunjummen/sample-ping-pong-netflixoss/tree/master/sample-pong
No comments:
Post a Comment