Thursday, April 9, 2020

Processing SQS Messages using Spring Boot and Project Reactor - Part 2

This is a follow up to my blog post about processing SQS messages efficiently using Spring Boot and Project Reactor

There are a few gaps in the approach that I have listed in the first part.

1. Handling failures in SQS Client calls
2. The approach would process only 1 message from SQS at a time, how can it be parallelized
3. It does not handle errors, any error in the pipeline would break the entire process and stop reading newer messages from the queue.


Recap

Just to recap, the previous post demonstrates creating a pipeline to process messages from an AWS SQS Queue using the excellent Project Reactor


The end result of that exercise was a pipeline which looks like this:




Given this pipeline, let me now go over how to bridge the gaps:

Handling SQS Client Failures


This is the function that generates the stream of messages read from SQS.

Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()

    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
    .flatMapIterable(Function.identity())

Now consider a case where the "sqsClient" above has a connectivity issue, the behavior with Flux is that in case of an error the stream is terminated. This, of course, will not do for a service whose job is to process messages as long the service is running.

The fix is to simply retry the processing flow in case of errors.

Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()

    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
    .flatMapIterable(Function.identity())
    .retry()

This would result in Flux re-establishing the stream of messages in case of any errors up to this point.

Processing Messages in Parallel

Project Reactor provides a few ways of parallelizing a processing pipeline. My first attempt at processing in parallel was to add a "subscribeOn" method to the processing chain.

Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()

    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
    .flatMapIterable(Function.identity())
    .retry()
    .subscribeOn(Schedulers.newElastic("sub"))

However, this is not quite how "subscribeOn" works. An output when I send a few messages to this pipeline is the following:

2020-04-07 20:52:53.241  INFO 1137 --- [          sub-3] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-07 20:52:53.434  INFO 1137 --- [          sub-3] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-07 20:52:53.493  INFO 1137 --- [          sub-3] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-07 20:52:53.538  INFO 1137 --- [          sub-3] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-07 20:52:53.609  INFO 1137 --- [          sub-3] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-07 20:52:53.700  INFO 1137 --- [          sub-3] sample.msg.MessageListenerRunner         : Processed Message hello

The "sub-3" above is the name of the thread processing the message, and it looks like all the messages are getting processed on the "sub-3" thread and on no other threads!

subscribeOn simply changes the execution context by borrowing "a thread" from this scheduler pool and does not use all the threads in the pool itself.

So how can the processing be parallelized? This StackOverflow answer provides a very good approach that I am using here, essentially to use a flatMap operator and adding the "subscribeOn" operator inside the "flatMap" operator.

This operator eagerly subscribes to its inner publishers and then flattens the result, the trick is that the inner subscribers can be provided their own schedulers and for each subscription will end up using a thread from the scheduler pool. The number of these concurrent subscribers can be controlled using a "concurrency" parameter passed to the flatMap operator.


Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()

    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
    .flatMapIterable(Function.identity())
    .retry()
    .flatMap({ (message: String, deleteHandle: () -> Unit) ->
        task(message)
            .then(Mono.fromSupplier { Try.of { deleteHandle() } })
            .then()
            .subscribeOn(taskScheduler)
    }, concurrency)

and an output when processing multiple messages looks like this -

2020-04-08 21:03:24.582  INFO 17541 --- [  taskHandler-4] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-08 21:03:24.815  INFO 17541 --- [  taskHandler-4] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-08 21:03:24.816  INFO 17541 --- [  taskHandler-5] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-08 21:03:24.816  INFO 17541 --- [  taskHandler-6] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-08 21:03:24.816  INFO 17541 --- [  taskHandler-7] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-08 21:03:24.817  INFO 17541 --- [  taskHandler-8] sample.msg.MessageListenerRunner         : Processed Message hello
see how there are more than thread name (taskHandler-*) in the logs now!


Handling downstream errors

One of my previous fixes with "retry" operator was about handling upstream errors with sqsClient connectivity. However, it is possible that as messages are being processed in the pipeline and any of the steps throw an error then the entire pipeline would fail. So it is important to guard EVERY step against failure. A neat way that I have been ensuring that errors don't propagate out is to use the excellent vavr library and its "Try" type. Try type holds two outcomes - a successful one(Success) or an Exception(Failure). This allows the rest of the pipeline to act on the outcome of the previous step in a measured way:

.flatMap({ (message: String, deleteHandle: () -> Unit) ->
    task(message)
        .then(Mono.fromSupplier { Try.of { deleteHandle() } })
        .doOnNext { t ->
            t.onFailure { e -> LOGGER.error(e.message, e) }
        }
        .then()
        .subscribeOn(taskScheduler)
}, concurrency)

The above snippet demonstrates an approach where I know that "deleteHandle" which is responsible for deleting a message can throw an exception, Try captures this and if there is an error logs it and this way the exception does not short circuit the flow of messages.



Conclusion

My initial thinking was that just because I have taken a reactive approach to process messages I would get huge boost in my sqs message processing pipeline, however, my learning has been that just like everything else it requires careful understanding and tuning for a Project reactor based stream to process messages efficiently. I am sure there a few more lessons for me to learn and I will be documenting those as I do.

This entire sample is available in my github repository here - https://github.com/bijukunjummen/boot-with-sns-sqs

3 comments:

  1. Any observations on performance / scale with Project Reactor ?

    ReplyDelete
  2. I doubt this would work correctly.
    You didn't configure any backpressure strategy, if the Flux.generate produce faster and the downstream buffer is full, then the source is canceled and your pipeline is stopped.
    Basically what you're trying to do is similar to:

    @Test
    void fastProducerSlowConsumerWithFlatMap() {
    AtomicInteger counter = new AtomicInteger(1);
    Flux.generate(synchronousSink -> {
    log("producer", counter.get());
    synchronousSink.next(counter.getAndIncrement());
    })
    .flatMap(v -> Flux.just(v).delayElements(Duration.ofMillis(1000)).doOnNext(e -> log("consumer", e)).subscribeOn(Schedulers.boundedElastic()), 10)
    .subscribe();
    }

    ReplyDelete
    Replies
    1. I think it does David, Flux.generate handles backpressure internally - see this post, http://www.java-allandsundry.com/2020/07/backpressure-in-project-reactor.html.

      Delete