Spring Web Async support
Consider a service call that takes a little while to process, simulated with a delay:public CompletableFuture<Message> getAMessageFuture() { return CompletableFuture.supplyAsync(() -> { logger.info("Start: Executing slow task in Service 1"); Util.delay(1000); logger.info("End: Executing slow task in Service 1"); return new Message("data 1"); }, futureExecutor); }
If I were to call this service in a user request flow, the traditional blocking controller flow would look like this:
@RequestMapping("/getAMessageFutureBlocking") public Message getAMessageFutureBlocking() throws Exception { return service1.getAMessageFuture().get(); }
A better approach is to use the Spring Asynchronous support to return the result back to the user when available from the CompletableFuture, this way not holding up the containers thread:
@RequestMapping("/getAMessageFutureAsync") public DeferredResult<Message> getAMessageFutureAsync() { DeferredResult<Message> deffered = new DeferredResult<>(90000); CompletableFuture<Message> f = this.service1.getAMessageFuture(); f.whenComplete((res, ex) -> { if (ex != null) { deffered.setErrorResult(ex); } else { deffered.setResult(res); } }); return deffered; }
Using Observable in a Async Flow
Now to the topic of this article, I have been using Rx-java's excellent Observable type as my service return types lately and wanted to ensure that the web layer also remains asynchronous in processing the Observable type returned from a service call.
Consider the service that was described above now modified to return an Observable:
public Observable<Message> getAMessageObs() { return Observable.<Message>create(s -> { logger.info("Start: Executing slow task in Service 1"); Util.delay(1000); s.onNext(new Message("data 1")); logger.info("End: Executing slow task in Service 1"); s.onCompleted(); }).subscribeOn(Schedulers.from(customObservableExecutor)); }
I can nullify all the benefits of returning an Observable by ending up with a blocking call at the web layer, a naive call will be the following:
@RequestMapping("/getAMessageObsBlocking") public Message getAMessageObsBlocking() { return service1.getAMessageObs().toBlocking().first(); }
To make this flow async through the web layer, a better way to handle this call is the following, essentially by transforming Observable to Spring's DeferredResult type:
@RequestMapping("/getAMessageObsAsync") public DeferredResult<Message> getAMessageAsync() { Observable<Message> o = this.service1.getAMessageObs(); DeferredResult<Message> deffered = new DeferredResult<>(90000); o.subscribe(m -> deffered.setResult(m), e -> deffered.setErrorResult(e)); return deffered; }
This would ensure that the thread handling the user flow would return as soon as the service call is complete and the user response will be processed reactively once the observable starts emitting values.
If you are interested in exploring this further, here is a github repo with working samples: https://github.com/bijukunjummen/spring-web-observable.
References:
Spring's reference guide on async flows in the web tier: http://docs.spring.io/spring/docs/current/spring-framework-reference/html/mvc.html#mvc-ann-asyncMore details on Spring DeferredResult by the inimitable Tomasz Nurkiewicz at the NoBlogDefFound blog - http://www.nurkiewicz.com/2013/03/deferredresult-asynchronous-processing.html
I'm having trouble using the same approach you describe. You can see my code here: http://pastebin.com/5G4BSW8H
ReplyDeleteThe following exception is thrown when my code is invoked:
java.lang.IllegalStateException: Cannot forward after response has been committed
Let me know if you have any suggestions.
Thanks in advance for your help!
-Tony
Hi Tony, I feel that you may be trying your sample on a older container that may not be completely implementing the servlet 3 api. Can you please share your complete app in a github repo or you can try putting your code in my github project https://github.com/bijukunjummen/spring-web-observable, this is a Spring boot based project, so you should be able to bring up your endpoint using "mvn spring-boot:run" command.
DeleteBiju,
DeleteThanks for your fast reply. I am using the latest version of Spring Boot so I know it's a recent container that fully supports the Servlet 3 API. I was able to make the code work by using DeferredResult<ResponseEntity<User>> so that the response includes a status code. Here is a link to the code that works: http://pastebin.com/ZjBR06YF
Thanks for the good article and your help.
-Tony
any luck there?
DeleteAn exception is probably being thrown but not shown. If you add logging in the error handler (in the subscribe) you'll probably see it.
DeleteGreat article.
ReplyDeleteI tried this with a spring boot application and tested with jmeter( 1000 threads). But the blocking was faster than the deferred result and observable..
I was trying it with the spring boot 1.3 M5, I'll try with the same version you were using.
I did try with callable, which was faster than the blocking approach
Ah, good point, it will depend a lot on how you have configured the different threadpools, will it be possible to share your code somewhere.
DeleteMy code is very similar to what you have on github.
DeleteSo I did some digging about. I noticed that when using callable the thread name when logging that the action had completed was [MvcAsyncxxx].
So took a look in the Spring framework and could see that it was using org.springframework.core.task.SimpleAsyncTaskExecutor, which does not reuse threads.
By default the number of concurrent threads is unlimited, this is configurable.
(http://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/core/task/SimpleAsyncTaskExecutor.html)
So I changed:
private final ExecutorService futureExecutor = Executors.newFixedThreadPool(10);
to:
private final ExecutorService futureExecutor = new SimpleAsyncTaskExecutor("DeferredThread-");
Then changed it to:
private final ExecutorService futureExecutor = Executors.newFixedThreadPool(1000, mNamedThreadFactory);
Then changed it to:
private final ExecutorService futureExecutor = Executors.newFixedThreadPool(2000, mNamedThreadFactory);
Here are my results:
Label | #Samples | Average | Median | 90% Line | 95% Line | 99% Line | Min | Max | Error % | Throughput
Blocking | 1000 | 10963 | 10977 | 16891 | 16912 | 16920 | 5002 | 16949 | 0.00% | 37.0
Callable | 1000 | 5004 | 5004 | 5006 | 5007 | 5014 | 5002 | 5028 | 0.00% | 66.0
Deferred (10 threads) | 1000 | 84636 | 91925 | 95683 | 96336 | 97500 | 5063 | 97809 | 82.00% | 9.5
Deferred (SimpleAsync) | 1000 | 5003 | 5003 | 5005 | 5008 | 5022 | 5001 | 5037 | 0.00% | 65.3
Deferred (1K threads) | 1000 | 5002 | 5001 | 5003 | 5006 | 5016 | 5001 | 5051 | 0.00% | 65.7
Observable (1K threads)| 1000 | 5003 | 5002 | 5003 | 5007 | 5032 | 5000 | 6021 | 0.00% | 61.6
Callable | 5000 | 5023 | 5007 | 5047 | 5090 | 5443 | 5001 | 5562 | 0.00% | 276.1
Deferred (1K threads) | 5000 | 10445 | 10457 | 15721 | 15818 | 16012 | 5001 | 16053 | 0.00% | 184.2
Observable (1K threads)| 5000 | 10553 | 10606 | 15990 | 16039 | 16104 | 5000 | 16173 | 0.00% | 184.0
Deferred (2K threads) | 5000 | 5255 | 5236 | 5601 | 5636 | 5739 | 5001 | 5787 | 0.00% | 291.9
Observable (2K threads)| 5000 | 5238 | 5081 | 5756 | 5870 | 6022 | 5002 | 6284 | 0.00% | 286.4
Note: It is worth running the tests a couple of times, due to thread pool being initialised.
Another thing to remember is that at the moment this is just sleeping for 5 seconds, there is no external dependencies.
You must fix ur blog's mobile page layout. When ur code is longer than website's width, i need to scroll the code but because ur mobile page moves to next or previous page while im dragging, i just cannot view the code. It just makes me feel sick. Please fix it. Lemme read ur blog on mobile
ReplyDeleteOh, interesting, I did know there was an issue with mobile display. thanks for your feedback, I will fix it soon
Delete