package obs.threads; import obs.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; public class GeneralService { private static final Logger logger = LoggerFactory.getLogger(GeneralService.class); public Observable<String> getData() { return Observable.<String>create(s -> { logger.info("Start: Executing a Service"); for (int i = 1; i <= 3; i++) { Util.delay(200); logger.info("Emitting {}", "root " + i); s.onNext("root " + i); } logger.info("End: Executing a Service"); s.onCompleted(); }); } }
Now, if I were to subscribe to this service, this way:
@Test public void testThreadedObservable1() throws Exception { Observable<String> ob1 = aService.getData(); CountDownLatch latch = new CountDownLatch(1); ob1.subscribe(s -> { Util.delay(500); logger.info("Got {}", s); }, e -> logger.error(e.getMessage(), e), () -> latch.countDown()); latch.await(); }
All of the emissions and subscriptions will act on the main thread and something along the following lines will be printed:
20:53:29.380 [main] INFO o.t.GeneralService - Start: Executing a Service 20:53:29.587 [main] INFO o.t.GeneralService - Emitting root 1 20:53:30.093 [main] INFO o.t.ThreadedObsTest - Got root 1 20:53:30.298 [main] INFO o.t.GeneralService - Emitting root 2 20:53:30.800 [main] INFO o.t.ThreadedObsTest - Got root 2 20:53:31.002 [main] INFO o.t.GeneralService - Emitting root 3 20:53:31.507 [main] INFO o.t.ThreadedObsTest - Got root 3 20:53:31.507 [main] INFO o.t.GeneralService - End: Executing a Service
By default the emissions are not asynchronous in nature. So now, what is the behavior if subscribeOn is used:
public class ThreadedObsTest { private GeneralService aService = new GeneralService(); private static final Logger logger = LoggerFactory.getLogger(ThreadedObsTest.class); private ExecutorService executor1 = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build()); @Test public void testSubscribeOn() throws Exception { Observable<String> ob1 = aService.getData(); CountDownLatch latch = new CountDownLatch(1); ob1.subscribeOn(Schedulers.from(executor1)).subscribe(s -> { Util.delay(500); logger.info("Got {}", s); }, e -> logger.error(e.getMessage(), e), () -> latch.countDown()); latch.await(); } }
Here I am using Guava's ThreadFactoryBuilder to give each thread in the threadpool a unique name pattern, if I were to execute this code, the output will be along these lines:
20:56:47.117 [SubscribeOn-0] INFO o.t.GeneralService - Start: Executing a Service 20:56:47.322 [SubscribeOn-0] INFO o.t.GeneralService - Emitting root 1 20:56:47.828 [SubscribeOn-0] INFO o.t.ThreadedObsTest - Got root 1 20:56:48.032 [SubscribeOn-0] INFO o.t.GeneralService - Emitting root 2 20:56:48.535 [SubscribeOn-0] INFO o.t.ThreadedObsTest - Got root 2 20:56:48.740 [SubscribeOn-0] INFO o.t.GeneralService - Emitting root 3 20:56:49.245 [SubscribeOn-0] INFO o.t.ThreadedObsTest - Got root 3 20:56:49.245 [SubscribeOn-0] INFO o.t.GeneralService - End: Executing a Service
Now, the execution has moved away from the main thread and the emissions and the subscriptions are being processed in the threads borrowed from the threadpool.
And what happens if observeOn is used:
public class ThreadedObsTest { private GeneralService aService = new GeneralService(); private static final Logger logger = LoggerFactory.getLogger(ThreadedObsTest.class); private ExecutorService executor1 = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build()); @Test public void testObserveOn() throws Exception { Observable<String> ob1 = aService.getData(); CountDownLatch latch = new CountDownLatch(1); ob1.observeOn(Schedulers.from(executor2)).subscribe(s -> { Util.delay(500); logger.info("Got {}", s); }, e -> logger.error(e.getMessage(), e), () -> latch.countDown()); latch.await(); } }
the output is along these lines:
21:03:08.655 [main] INFO o.t.GeneralService - Start: Executing a Service 21:03:08.860 [main] INFO o.t.GeneralService - Emitting root 1 21:03:09.067 [main] INFO o.t.GeneralService - Emitting root 2 21:03:09.268 [main] INFO o.t.GeneralService - Emitting root 3 21:03:09.269 [main] INFO o.t.GeneralService - End: Executing a Service 21:03:09.366 [ObserveOn-1] INFO o.t.ThreadedObsTest - Got root 1 21:03:09.872 [ObserveOn-1] INFO o.t.ThreadedObsTest - Got root 2 21:03:10.376 [ObserveOn-1] INFO o.t.ThreadedObsTest - Got root 3
The emissions are now back on the main thread but the subscriptions are being processed in a threadpool.
That is the difference, when subscribeOn is used the emissions are performed on the specified Scheduler, when observeOn is used the subscriptions are performed are on the specified scheduler!
And the output when both are specified is equally predictable. Now in all cases I had created a Scheduler using a ThreadPool with 5 threads but only 1 of the threads has really been used both for emitting values and for processing subscriptions, this is actually the normal behavior of Observables. If you want to make more efficient use of the Threadpool, one approach may be to create multiple Observable's, say for eg, if I have a service which returns pages of data this way:
public Observable<Integer> getPages(int totalPages) { return Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { logger.info("Getting pages"); for (int i = 1; i <= totalPages; i++) { subscriber.onNext(i); } subscriber.onCompleted(); } }); }
and another service which acts on each page of the data:
public Observable<String> actOnAPage(int pageNum) { return Observable.<String>create(s -> { Util.delay(200); logger.info("Acting on page {}", pageNum); s.onNext("Page " + pageNum); s.onCompleted(); }); }
a way to use a Threadpool to process each page of data would be to chain it this way:
getPages(5).flatMap( page -> aService.actOnAPage(page).subscribeOn(Schedulers.from(executor1)) ) .subscribe(s -> { logger.info("Completed Processing page: {}", s); });
see how the subscribeOn is on the each Observable acting on a page. With this change, the output would look like this:
21:15:45.572 [main] INFO o.t.ThreadedObsTest - Getting pages 21:15:45.787 [SubscribeOn-1] INFO o.t.GeneralService - Acting on page 2 21:15:45.787 [SubscribeOn-0] INFO o.t.GeneralService - Acting on page 1 21:15:45.787 [SubscribeOn-4] INFO o.t.GeneralService - Acting on page 5 21:15:45.787 [SubscribeOn-3] INFO o.t.GeneralService - Acting on page 4 21:15:45.787 [SubscribeOn-2] INFO o.t.GeneralService - Acting on page 3 21:15:45.789 [SubscribeOn-1] INFO o.t.ThreadedObsTest - Completed Processing page: Page 2 21:15:45.790 [SubscribeOn-1] INFO o.t.ThreadedObsTest - Completed Processing page: Page 1 21:15:45.790 [SubscribeOn-1] INFO o.t.ThreadedObsTest - Completed Processing page: Page 3 21:15:45.790 [SubscribeOn-1] INFO o.t.ThreadedObsTest - Completed Processing page: Page 4 21:15:45.791 [SubscribeOn-1] INFO o.t.ThreadedObsTest - Completed Processing page: Page 5
Now the threads in the threadpool are being used uniformly.
thanks
ReplyDeleteHey, thanks for the post, it's really useful.
ReplyDeleteCould you upload and link the project in Github to test other use case ?
Good post but I think there are some problems. 1. Delay or sleep in a publisher should be considered cardinal sin. 2. Only use Observable.create as the last resort; in fact in RxJava 2, they discourage people from using it. 3. You're not using CountDownLatch for what it is; you set it to 1 and then wait in the subscribe methods, which completely defeats the purpose of the latch.
ReplyDeleteI converted your examples to a simpler version using project Reactor. I used your GitHub domain name as the package, which I believe is a reasonable attribution . https://github.com/asarkar/java/blob/master/concurrency-learning/src/main/java/com/github/bijukunjummen/Service.java
Other than simplicity, I used the parallel operator for your last example that doesn't exist in RxJava1 but does in RxJava2.
Yes, I think your observations are totally fair Abhijit. The point of the article though was simply to show which thread is acting on the emissions and which ones on subscriptions and I did not give real thought to how the emissions were created. CountdownLatch is being purely used for ensuring that the test does not jump out of the test and I agree a better use would have been to use the TestSubscriber in rx-java :-).
DeleteThank you. I didn't use the TestSubscriber though.
Delete