There are a few gaps in the approach that I have listed in the first part.
1. Handling failures in SQS Client calls
2. The approach would process only 1 message from SQS at a time, how can it be parallelized
3. It does not handle errors, any error in the pipeline would break the entire process and stop reading newer messages from the queue.
Recap
Just to recap, the previous post demonstrates creating a pipeline to process messages from an AWS SQS Queue using the excellent Project ReactorThe end result of that exercise was a pipeline which looks like this:
Given this pipeline, let me now go over how to bridge the gaps:
Handling SQS Client Failures
This is the function that generates the stream of messages read from SQS.
Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(5) .waitTimeSeconds(10) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages) } .flatMapIterable(Function.identity())
Now consider a case where the "sqsClient" above has a connectivity issue, the behavior with Flux is that in case of an error the stream is terminated. This, of course, will not do for a service whose job is to process messages as long the service is running.
The fix is to simply retry the processing flow in case of errors.
Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(5) .waitTimeSeconds(10) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages) } .flatMapIterable(Function.identity()) .retry()
This would result in Flux re-establishing the stream of messages in case of any errors up to this point.
Processing Messages in Parallel
Project Reactor provides a few ways of parallelizing a processing pipeline. My first attempt at processing in parallel was to add a "subscribeOn" method to the processing chain.Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(5) .waitTimeSeconds(10) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages) } .flatMapIterable(Function.identity()) .retry() .subscribeOn(Schedulers.newElastic("sub"))
However, this is not quite how "subscribeOn" works. An output when I send a few messages to this pipeline is the following:
2020-04-07 20:52:53.241 INFO 1137 --- [ sub-3] sample.msg.MessageListenerRunner : Processed Message hello 2020-04-07 20:52:53.434 INFO 1137 --- [ sub-3] sample.msg.MessageListenerRunner : Processed Message hello 2020-04-07 20:52:53.493 INFO 1137 --- [ sub-3] sample.msg.MessageListenerRunner : Processed Message hello 2020-04-07 20:52:53.538 INFO 1137 --- [ sub-3] sample.msg.MessageListenerRunner : Processed Message hello 2020-04-07 20:52:53.609 INFO 1137 --- [ sub-3] sample.msg.MessageListenerRunner : Processed Message hello 2020-04-07 20:52:53.700 INFO 1137 --- [ sub-3] sample.msg.MessageListenerRunner : Processed Message hello
The "sub-3" above is the name of the thread processing the message, and it looks like all the messages are getting processed on the "sub-3" thread and on no other threads!
subscribeOn simply changes the execution context by borrowing "a thread" from this scheduler pool and does not use all the threads in the pool itself.
So how can the processing be parallelized? This StackOverflow answer provides a very good approach that I am using here, essentially to use a flatMap operator and adding the "subscribeOn" operator inside the "flatMap" operator.
This operator eagerly subscribes to its inner publishers and then flattens the result, the trick is that the inner subscribers can be provided their own schedulers and for each subscription will end up using a thread from the scheduler pool. The number of these concurrent subscribers can be controlled using a "concurrency" parameter passed to the flatMap operator.
Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(5) .waitTimeSeconds(10) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages) } .flatMapIterable(Function.identity()) .retry() .flatMap({ (message: String, deleteHandle: () -> Unit) -> task(message) .then(Mono.fromSupplier { Try.of { deleteHandle() } }) .then() .subscribeOn(taskScheduler) }, concurrency)
and an output when processing multiple messages looks like this -
2020-04-08 21:03:24.582 INFO 17541 --- [ taskHandler-4] sample.msg.MessageListenerRunner : Processed Message hello 2020-04-08 21:03:24.815 INFO 17541 --- [ taskHandler-4] sample.msg.MessageListenerRunner : Processed Message hello 2020-04-08 21:03:24.816 INFO 17541 --- [ taskHandler-5] sample.msg.MessageListenerRunner : Processed Message hello 2020-04-08 21:03:24.816 INFO 17541 --- [ taskHandler-6] sample.msg.MessageListenerRunner : Processed Message hello 2020-04-08 21:03:24.816 INFO 17541 --- [ taskHandler-7] sample.msg.MessageListenerRunner : Processed Message hello 2020-04-08 21:03:24.817 INFO 17541 --- [ taskHandler-8] sample.msg.MessageListenerRunner : Processed Message hellosee how there are more than thread name (taskHandler-*) in the logs now!
Handling downstream errors
One of my previous fixes with "retry" operator was about handling upstream errors with sqsClient connectivity. However, it is possible that as messages are being processed in the pipeline and any of the steps throw an error then the entire pipeline would fail. So it is important to guard EVERY step against failure. A neat way that I have been ensuring that errors don't propagate out is to use the excellent vavr library and its "Try" type. Try type holds two outcomes - a successful one(Success) or an Exception(Failure). This allows the rest of the pipeline to act on the outcome of the previous step in a measured way:.flatMap({ (message: String, deleteHandle: () -> Unit) -> task(message) .then(Mono.fromSupplier { Try.of { deleteHandle() } }) .doOnNext { t -> t.onFailure { e -> LOGGER.error(e.message, e) } } .then() .subscribeOn(taskScheduler) }, concurrency)
The above snippet demonstrates an approach where I know that "deleteHandle" which is responsible for deleting a message can throw an exception, Try captures this and if there is an error logs it and this way the exception does not short circuit the flow of messages.
Conclusion
My initial thinking was that just because I have taken a reactive approach to process messages I would get huge boost in my sqs message processing pipeline, however, my learning has been that just like everything else it requires careful understanding and tuning for a Project reactor based stream to process messages efficiently. I am sure there a few more lessons for me to learn and I will be documenting those as I do.This entire sample is available in my github repository here - https://github.com/bijukunjummen/boot-with-sns-sqs
Any observations on performance / scale with Project Reactor ?
ReplyDeleteI doubt this would work correctly.
ReplyDeleteYou didn't configure any backpressure strategy, if the Flux.generate produce faster and the downstream buffer is full, then the source is canceled and your pipeline is stopped.
Basically what you're trying to do is similar to:
@Test
void fastProducerSlowConsumerWithFlatMap() {
AtomicInteger counter = new AtomicInteger(1);
Flux.generate(synchronousSink -> {
log("producer", counter.get());
synchronousSink.next(counter.getAndIncrement());
})
.flatMap(v -> Flux.just(v).delayElements(Duration.ofMillis(1000)).doOnNext(e -> log("consumer", e)).subscribeOn(Schedulers.boundedElastic()), 10)
.subscribe();
}
I think it does David, Flux.generate handles backpressure internally - see this post, http://www.java-allandsundry.com/2020/07/backpressure-in-project-reactor.html.
Delete