Friday, October 21, 2016

Tracing Spring Integration Flow with Spring Cloud Sleuth

Spring Cloud Sleuth is an awesome project that provides a way to trace requests that span multiple systems. Spring Cloud sleuth can optionally export this trace data to Zipkin where it can be visualized in a neat way. I especially love the fact that Spring Cloud Sleuth integrates deeply with Spring Integration and can nicely trace out the flow of a message.


Consider the following -



I have two different systems here - a work dispatcher producing "Work Unit"s and a Work Handler consuming them. They talk over a RabbitMQ broker. Just to mix the flow up a bit, I also have a retry mechanism in place which retries the message every 20 seconds in case of a processing failure



Both these systems are described using Spring Integration Java DSL, the outbound flow dispatching the WorkUnits looks like this:

@Configuration
public class WorksOutbound {

    @Autowired
    private RabbitConfig rabbitConfig;

    @Bean
    public IntegrationFlow toOutboundQueueFlow() {
        return IntegrationFlows.from("worksChannel")
                .transform(Transformers.toJson())
                .log()
                .handle(Amqp.outboundGateway(rabbitConfig.worksRabbitTemplate()))
                .transform(Transformers.fromJson(WorkUnitResponse.class))
                .get();
    }

    @Bean
    public IntegrationFlow handleErrors() {
        return IntegrationFlows.from("errorChannel")
                .transform((MessagingException e) -> e.getFailedMessage().getPayload())
                .transform(Transformers.fromJson(WorkUnit.class))
                .transform((WorkUnit failedWorkUnit) -> new WorkUnitResponse(failedWorkUnit.getId(), failedWorkUnit.getDefinition(), false))
                .get();
    }

}

This is eminently readable - the "Work Unit" comes through a "works channel" and is dispatched to a RabbitMQ queue after tranforming to json. Note that the dispatch is via an outbound gateway, this means that the Spring integration would put the necessary infrastructure in place to wait for a reply to be back from the remote system. In case of an error, say if the reply does not appear in time, a stock response is provided back to the user.

On the Work Handler side a similar flow handles the message:

@Configuration
public class WorkInbound {

    @Autowired
    private RabbitConfig rabbitConfig;

    @Bean
    public IntegrationFlow inboundFlow() {
        return IntegrationFlows.from(
                Amqp.inboundGateway(rabbitConfig.workListenerContainer()))
                .transform(Transformers.fromJson(WorkUnit.class))
                .log()
                .filter("(headers['x-death'] != null) ? headers['x-death'][0].count < 3: true", f -> f.discardChannel("nullChannel"))
                .handle("workHandler", "process")
                .transform(Transformers.toJson())
                .get();
    }

}

The only wrinkle in this flow is the retry logic which discards the message after 3 retries. If you are interested in the details of how the retry is being hooked up, I have more details here.


So now, given this fairly involved flow, here is how Spring Cloud Sleuth with Zipkin integrated looks like:



Spring Cloud Sleuth intercepts every message channel and tags the message as it flows through the channel.


Now for something a little more interesting, if the flow were more complex with 3 retries each 20 seconds apart, again the flow is beautifully brought out by Spring Cloud Sleuth and its integration with Zipkin.


Conclusion


If you maintain a Spring Integration based flow, Spring Cloud Sleuth is an addition to the project and can trace the runtime path of a message and show it visually using the Zipkin UI. I look forward to exploring more of the nuances of this excellent project.


The sample that I have demonstrated here is available in my github repo - https://github.com/bijukunjummen/si-with-sleuth-sample