Tuesday, June 23, 2015

Rx-java subscribeOn and observeOn

If you have been confused by Rx-java Observable subscribeOn and observeOn, one of the blog articles that helped me understand these operations is this one by Graham Lea. I wanted to recreate a very small part of the article here, so consider a service which emits values every 200 millseconds:



package obs.threads;

import obs.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

public class GeneralService {
    private static final Logger logger = LoggerFactory.getLogger(GeneralService.class);
    public Observable<String> getData() {
        return Observable.<String>create(s -> {
            logger.info("Start: Executing a Service");
            for (int i = 1; i <= 3; i++) {
                Util.delay(200);
                logger.info("Emitting {}", "root " + i);
                s.onNext("root " + i);
            }
            logger.info("End: Executing a Service");
            s.onCompleted();
        });
    }
}

Now, if I were to subscribe to this service, this way:

@Test
public void testThreadedObservable1() throws Exception {
    Observable<String> ob1 = aService.getData();

    CountDownLatch latch = new CountDownLatch(1);

    ob1.subscribe(s -> {
        Util.delay(500);
        logger.info("Got {}", s);
    }, e -> logger.error(e.getMessage(), e), () -> latch.countDown());

    latch.await();
}

All of the emissions and subscriptions will act on the main thread and something along the following lines will be printed:

20:53:29.380 [main] INFO  o.t.GeneralService - Start: Executing a Service
20:53:29.587 [main] INFO  o.t.GeneralService - Emitting root 1
20:53:30.093 [main] INFO  o.t.ThreadedObsTest - Got root 1
20:53:30.298 [main] INFO  o.t.GeneralService - Emitting root 2
20:53:30.800 [main] INFO  o.t.ThreadedObsTest - Got root 2
20:53:31.002 [main] INFO  o.t.GeneralService - Emitting root 3
20:53:31.507 [main] INFO  o.t.ThreadedObsTest - Got root 3
20:53:31.507 [main] INFO  o.t.GeneralService - End: Executing a Service

By default the emissions are not asynchronous in nature. So now, what is the behavior if subscribeOn is used:

public class ThreadedObsTest {
    private GeneralService aService = new GeneralService();

    private static final Logger logger = LoggerFactory.getLogger(ThreadedObsTest.class);
    private ExecutorService executor1 = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());

 @Test
 public void testSubscribeOn() throws Exception {
        Observable<String> ob1 = aService.getData();

        CountDownLatch latch = new CountDownLatch(1);

        ob1.subscribeOn(Schedulers.from(executor1)).subscribe(s -> {
            Util.delay(500);
            logger.info("Got {}", s);
        }, e -> logger.error(e.getMessage(), e), () -> latch.countDown());

        latch.await();
    }
}

Here I am using Guava's ThreadFactoryBuilder to give each thread in the threadpool a unique name pattern, if I were to execute this code, the output will be along these lines:

20:56:47.117 [SubscribeOn-0] INFO  o.t.GeneralService - Start: Executing a Service
20:56:47.322 [SubscribeOn-0] INFO  o.t.GeneralService - Emitting root 1
20:56:47.828 [SubscribeOn-0] INFO  o.t.ThreadedObsTest - Got root 1
20:56:48.032 [SubscribeOn-0] INFO  o.t.GeneralService - Emitting root 2
20:56:48.535 [SubscribeOn-0] INFO  o.t.ThreadedObsTest - Got root 2
20:56:48.740 [SubscribeOn-0] INFO  o.t.GeneralService - Emitting root 3
20:56:49.245 [SubscribeOn-0] INFO  o.t.ThreadedObsTest - Got root 3
20:56:49.245 [SubscribeOn-0] INFO  o.t.GeneralService - End: Executing a Service

Now, the execution has moved away from the main thread and the emissions and the subscriptions are being processed in the threads borrowed from the threadpool.

And what happens if observeOn is used:
public class ThreadedObsTest {
    private GeneralService aService = new GeneralService();

    private static final Logger logger = LoggerFactory.getLogger(ThreadedObsTest.class);
    private ExecutorService executor1 = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());

 @Test
 public void testObserveOn() throws Exception {
        Observable<String> ob1 = aService.getData();

        CountDownLatch latch = new CountDownLatch(1);

        ob1.observeOn(Schedulers.from(executor2)).subscribe(s -> {
            Util.delay(500);
            logger.info("Got {}", s);
        }, e -> logger.error(e.getMessage(), e), () -> latch.countDown());

        latch.await();
    }
}

the output is along these lines:

21:03:08.655 [main] INFO  o.t.GeneralService - Start: Executing a Service
21:03:08.860 [main] INFO  o.t.GeneralService - Emitting root 1
21:03:09.067 [main] INFO  o.t.GeneralService - Emitting root 2
21:03:09.268 [main] INFO  o.t.GeneralService - Emitting root 3
21:03:09.269 [main] INFO  o.t.GeneralService - End: Executing a Service
21:03:09.366 [ObserveOn-1] INFO  o.t.ThreadedObsTest - Got root 1
21:03:09.872 [ObserveOn-1] INFO  o.t.ThreadedObsTest - Got root 2
21:03:10.376 [ObserveOn-1] INFO  o.t.ThreadedObsTest - Got root 3

The emissions are now back on the main thread but the subscriptions are being processed in a threadpool.

That is the difference, when subscribeOn is used the emissions are performed on the specified Scheduler, when observeOn is used the subscriptions are performed are on the specified scheduler!

And the output when both are specified is equally predictable. Now in all cases I had created a Scheduler using a ThreadPool with 5 threads but only 1 of the threads has really been used both for emitting values and for processing subscriptions, this is actually the normal behavior of Observables. If you want to make more efficient use of the Threadpool, one approach may be to create multiple Observable's, say for eg, if I have a service which returns pages of data this way:

public Observable<Integer> getPages(int totalPages) {
    return Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            logger.info("Getting pages");
            for (int i = 1; i <= totalPages; i++) {
                subscriber.onNext(i);
            }
            subscriber.onCompleted();
        }
    });
}

and another service which acts on each page of the data:

public Observable<String> actOnAPage(int pageNum) {
    return Observable.<String>create(s -> {
        Util.delay(200);
        logger.info("Acting on page {}",  pageNum);
        s.onNext("Page " + pageNum);
        s.onCompleted();
    });
}

a way to use a Threadpool to process each page of data would be to chain it this way:

getPages(5).flatMap(  page -> aService.actOnAPage(page).subscribeOn(Schedulers.from(executor1)) )
                .subscribe(s -> {
                    logger.info("Completed Processing page: {}", s);
        });

see how the subscribeOn is on the each Observable acting on a page. With this change, the output would look like this:

21:15:45.572 [main] INFO  o.t.ThreadedObsTest - Getting pages
21:15:45.787 [SubscribeOn-1] INFO  o.t.GeneralService - Acting on page 2
21:15:45.787 [SubscribeOn-0] INFO  o.t.GeneralService - Acting on page 1
21:15:45.787 [SubscribeOn-4] INFO  o.t.GeneralService - Acting on page 5
21:15:45.787 [SubscribeOn-3] INFO  o.t.GeneralService - Acting on page 4
21:15:45.787 [SubscribeOn-2] INFO  o.t.GeneralService - Acting on page 3
21:15:45.789 [SubscribeOn-1] INFO  o.t.ThreadedObsTest - Completed Processing page: Page 2
21:15:45.790 [SubscribeOn-1] INFO  o.t.ThreadedObsTest - Completed Processing page: Page 1
21:15:45.790 [SubscribeOn-1] INFO  o.t.ThreadedObsTest - Completed Processing page: Page 3
21:15:45.790 [SubscribeOn-1] INFO  o.t.ThreadedObsTest - Completed Processing page: Page 4
21:15:45.791 [SubscribeOn-1] INFO  o.t.ThreadedObsTest - Completed Processing page: Page 5

Now the threads in the threadpool are being used uniformly.

5 comments:

  1. Hey, thanks for the post, it's really useful.
    Could you upload and link the project in Github to test other use case ?

    ReplyDelete
  2. Good post but I think there are some problems. 1. Delay or sleep in a publisher should be considered cardinal sin. 2. Only use Observable.create as the last resort; in fact in RxJava 2, they discourage people from using it. 3. You're not using CountDownLatch for what it is; you set it to 1 and then wait in the subscribe methods, which completely defeats the purpose of the latch.
    I converted your examples to a simpler version using project Reactor. I used your GitHub domain name as the package, which I believe is a reasonable attribution . https://github.com/asarkar/java/blob/master/concurrency-learning/src/main/java/com/github/bijukunjummen/Service.java

    Other than simplicity, I used the parallel operator for your last example that doesn't exist in RxJava1 but does in RxJava2.

    ReplyDelete
    Replies
    1. Yes, I think your observations are totally fair Abhijit. The point of the article though was simply to show which thread is acting on the emissions and which ones on subscriptions and I did not give real thought to how the emissions were created. CountdownLatch is being purely used for ensuring that the test does not jump out of the test and I agree a better use would have been to use the TestSubscriber in rx-java :-).

      Delete
    2. Thank you. I didn't use the TestSubscriber though.

      Delete