Consider the following -
I have two different systems here - a work dispatcher producing "Work Unit"s and a Work Handler consuming them. They talk over a RabbitMQ broker. Just to mix the flow up a bit, I also have a retry mechanism in place which retries the message every 20 seconds in case of a processing failure
Both these systems are described using Spring Integration Java DSL, the outbound flow dispatching the WorkUnits looks like this:
@Configuration public class WorksOutbound { @Autowired private RabbitConfig rabbitConfig; @Bean public IntegrationFlow toOutboundQueueFlow() { return IntegrationFlows.from("worksChannel") .transform(Transformers.toJson()) .log() .handle(Amqp.outboundGateway(rabbitConfig.worksRabbitTemplate())) .transform(Transformers.fromJson(WorkUnitResponse.class)) .get(); } @Bean public IntegrationFlow handleErrors() { return IntegrationFlows.from("errorChannel") .transform((MessagingException e) -> e.getFailedMessage().getPayload()) .transform(Transformers.fromJson(WorkUnit.class)) .transform((WorkUnit failedWorkUnit) -> new WorkUnitResponse(failedWorkUnit.getId(), failedWorkUnit.getDefinition(), false)) .get(); } }
This is eminently readable - the "Work Unit" comes through a "works channel" and is dispatched to a RabbitMQ queue after tranforming to json. Note that the dispatch is via an outbound gateway, this means that the Spring integration would put the necessary infrastructure in place to wait for a reply to be back from the remote system. In case of an error, say if the reply does not appear in time, a stock response is provided back to the user.
On the Work Handler side a similar flow handles the message:
@Configuration public class WorkInbound { @Autowired private RabbitConfig rabbitConfig; @Bean public IntegrationFlow inboundFlow() { return IntegrationFlows.from( Amqp.inboundGateway(rabbitConfig.workListenerContainer())) .transform(Transformers.fromJson(WorkUnit.class)) .log() .filter("(headers['x-death'] != null) ? headers['x-death'][0].count < 3: true", f -> f.discardChannel("nullChannel")) .handle("workHandler", "process") .transform(Transformers.toJson()) .get(); } }
The only wrinkle in this flow is the retry logic which discards the message after 3 retries. If you are interested in the details of how the retry is being hooked up, I have more details here.
So now, given this fairly involved flow, here is how Spring Cloud Sleuth with Zipkin integrated looks like:
Spring Cloud Sleuth intercepts every message channel and tags the message as it flows through the channel.
Now for something a little more interesting, if the flow were more complex with 3 retries each 20 seconds apart, again the flow is beautifully brought out by Spring Cloud Sleuth and its integration with Zipkin.
Conclusion
If you maintain a Spring Integration based flow, Spring Cloud Sleuth is an addition to the project and can trace the runtime path of a message and show it visually using the Zipkin UI. I look forward to exploring more of the nuances of this excellent project.
The sample that I have demonstrated here is available in my github repo - https://github.com/bijukunjummen/si-with-sleuth-sample
Glad that you like it :) If you have any suggestions on how to improve the instrumentation / documentation do not hesitate to ping us on GitHub :)
ReplyDeleteAbsolutely! will do, thanks once more for creating this awesome project :-).
Delete