Saturday, December 13, 2014

RabbitMQ - Processing messages serially using Spring integration Java DSL

If you ever have a need to process messages serially with RabbitMQ with a cluster of listeners processing the messages, the best way that I have seen is to use a "exclusive consumer" flag on a listener with 1 thread on each listener processing the messages.

Exclusive consumer flag ensures that only 1 consumer can read messages from the specific queue, and 1 thread on that consumer ensures that the messages are processed serially. There is a catch however, I will go over it later.

Let me demonstrate this behavior with a Spring Boot and Spring Integration based RabbitMQ message consumer.

First, this is the configuration for setting up a queue using Spring java configuration, note that since this is a Spring Boot application, it automatically creates a RabbitMQ connection factory when the Spring-amqp library is added to the list of dependencies:

@Configuration
@Configuration
public class RabbitConfig {

    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Bean
    public Queue sampleQueue() {
        return new Queue("sample.queue", true, false, false);
    }

}

Given this sample queue, a listener which gets the messages from this queue and processes them looks like this, the flow is written using the excellent Spring integration Java DSL library:

@Configuration
public class RabbitInboundFlow {
    private static final Logger logger = LoggerFactory.getLogger(RabbitInboundFlow.class);

    @Autowired
    private RabbitConfig rabbitConfig;

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(this.connectionFactory);
        listenerContainer.setQueues(this.rabbitConfig.sampleQueue());
        listenerContainer.setConcurrentConsumers(1);
        listenerContainer.setExclusive(true);
        return listenerContainer;
    }

    @Bean
    public IntegrationFlow inboundFlow() {
        return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer()))
                .transform(Transformers.objectToString())
                .handle((m) -> {
                    logger.info("Processed  {}", m.getPayload());
                })
                .get();
    }

}


The flow is very concisely expressed in the inboundFlow method, a message payload from RabbitMQ is transformed from byte array to String and finally processed by simply logging the message to the logs

The important part of the flow is the listener configuration, note the flag which sets the consumer to be an exclusive consumer and within this consumer the number of threads processing is set to 1. Given this even if multiple instances of the application is started up only 1 of the listeners will be able to connect and process messages.


Now for the catch, consider a case where the processing of messages takes a while to complete and rolls back during processing of the message. If the instance of the application handling the message were to be stopped in the middle of processing such a message, then the behavior is a different instance will start handling the messages in the queue, when the stopped instance rolls back the message, the rolled back message is then delivered to the new exclusive consumer, thus getting a message out of order.

If you are interested in exploring this further, here is a github project to play with this feature: https://github.com/bijukunjummen/test-rabbit-exclusive

2 comments:

  1. Any numbers on throughput msg/second (incl. size per msg ofc) ?

    ReplyDelete
    Replies
    1. Hi Rudiger, it will be as slow as the processing time of a single message as the messages are being processed one at a time serially.

      Delete