Karyon has been undergoing quite a lot of changes recently and my objective here is to document a good sample using the newer version of Karyon. The old Karyon(call it Karyon1) was based on JAX-RS 1.0 Specs with Jersey as the implementation, the newer version of Karyon(Karyon2) still supports Jersey but also encourages the use of RX-Netty which is a customized version of Netty with support for Rx-java.
With that said, let me jump into a sample. My objective with this sample is to create a "pong" micro-service which takes a "POST"ed "message" and returns an "Acknowledgement"
The following is a sample request:
{ "id": "id", "payload":"Ping" }
And an expected response:
{"id":"id","received":"Ping","payload":"Pong"}
The first step is to create a RequestHandler which as the name suggests is an RX-Netty component dealing with routing the incoming request:
package org.bk.samplepong.app; import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; import io.reactivex.netty.protocol.http.server.HttpServerRequest; import io.reactivex.netty.protocol.http.server.HttpServerResponse; import io.reactivex.netty.protocol.http.server.RequestHandler; import netflix.karyon.transport.http.health.HealthCheckEndpoint; import org.bk.samplepong.domain.Message; import org.bk.samplepong.domain.MessageAcknowledgement; import rx.Observable; import java.io.IOException; import java.nio.charset.Charset; public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> { private final String healthCheckUri; private final HealthCheckEndpoint healthCheckEndpoint; private final ObjectMapper objectMapper = new ObjectMapper(); public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) { this.healthCheckUri = healthCheckUri; this.healthCheckEndpoint = healthCheckEndpoint; } @Override public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) { if (request.getUri().startsWith(healthCheckUri)) { return healthCheckEndpoint.handle(request, response); } else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) { return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8"))) .map(s -> { try { Message m = objectMapper.readValue(s, Message.class); return m; } catch (IOException e) { throw new RuntimeException(e); } }) .map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong")) .flatMap(ack -> { try { return response.writeStringAndFlush(objectMapper.writeValueAsString(ack)); } catch (Exception e) { response.setStatus(HttpResponseStatus.BAD_REQUEST); return response.close(); } } ); } else { response.setStatus(HttpResponseStatus.NOT_FOUND); return response.close(); } } }
This flow is completely asynchronous and internally managed by the RX-java libraries, Java 8 Lambda expressions also help in making the code concise. The one issue that you would see here is that the routing logic(which uri to which controller) is mixed up with the actual controller logic and I believe this is being addressed.
Given this RequestHandler, a server can be started up in a standalone java program, using raw RX-Netty this way, this is essentially it, an endpoint will be brought up at port 8080 to handle the requests:
public final class RxNettyExample { public static void main(String... args) throws Exception { final ObjectMapper objectMapper = new ObjectMapper(); RxNettyHandler handler = new RxNettyHandler(); HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(8080, handler); server.start();
This is however the native Rx-netty way, for a cloud-ready micro-service a few things have to happen, the service should register with Eureka and should respond to the healthchecks back from Eureka and should be able to load up properties using Archaius.
So with Karyon2, the startup in a main program looks a little different:
package org.bk.samplepong.app; import netflix.adminresources.resources.KaryonWebAdminModule; import netflix.karyon.Karyon; import netflix.karyon.KaryonBootstrapModule; import netflix.karyon.ShutdownModule; import netflix.karyon.archaius.ArchaiusBootstrapModule; import netflix.karyon.eureka.KaryonEurekaModule; import netflix.karyon.servo.KaryonServoModule; import netflix.karyon.transport.http.health.HealthCheckEndpoint; import org.bk.samplepong.resource.HealthCheck; public class SamplePongApp { public static void main(String[] args) { HealthCheck healthCheckHandler = new HealthCheck(); Karyon.forRequestHandler(8888, new RxNettyHandler("/healthcheck", new HealthCheckEndpoint(healthCheckHandler)), new KaryonBootstrapModule(healthCheckHandler), new ArchaiusBootstrapModule("sample-pong"), KaryonEurekaModule.asBootstrapModule(), Karyon.toBootstrapModule(KaryonWebAdminModule.class), ShutdownModule.asBootstrapModule(), KaryonServoModule.asBootstrapModule() ).startAndWaitTillShutdown(); } }
Now it is essentially cloud ready, this version of the program on startup would register cleanly with Eureka and expose a healthcheck endpoint. It additionally exposes a neat set of admin endpoints at port 8077.
No comments:
Post a Comment