Tuesday, April 26, 2016

Scatter-Gather using Spring Reactor Core

I have a good working experience in using the Netflix Rx-Java libraries and have previously blogged about using Rx-Java and Java 8 CompletableFuture for a scatter-gather kind of problems. Here I want to explore applying the same pattern using the Spring Reactor Core library.

tldr - If you are familiar with Netflix Rx-Java, you already know Spring Reactor Core, the API's map beautifully and I was thrilled to see that the Spring Reactor team has diligently used Marble diagrams in their Javadoc API's

Another quick point is that rx.Observable maps to Flux or Mono based on whether many items are being emitted or whether one or none is being emitted.

With this let me directly jump into the sample - I have a simple task(simulated using a delay) that is spawned a few times, I need to execute these tasks concurrently and then collect back the results, represented the following way using a rx.Observable code:

@Test
public void testScatterGather() throws Exception {
    ExecutorService executors = Executors.newFixedThreadPool(5);

    List<Observable<String>> obs =
            IntStream.range(0, 10)
                .boxed()
                .map(i -> generateTask(i, executors)).collect(Collectors.toList());


    Observable<List<String>> merged = Observable.merge(obs).toList();
    List<String> result = merged.toBlocking().first();

    logger.info(result.toString());

}

private Observable<String> generateTask(int i, ExecutorService executorService) {
    return Observable
            .<String>create(s -> {
                Util.delay(2000);
                s.onNext( i + "-test");
                s.onCompleted();
            }).subscribeOn(Schedulers.from(executorService));
}


Note that I am blocking purely for the test.

Now, a similar code using Spring Reactor Core translates to the following:

@Test
public void testScatterGather() {
    ExecutorService executors = Executors.newFixedThreadPool(5);

    List<Flux<String>> fluxList = IntStream.range(0, 10)
            .boxed()
            .map(i -> generateTask(executors, i)).collect(Collectors.toList());

    Mono<List<String>> merged = Flux.merge(fluxList).toList();

    List<String> list = merged.get();

    logger.info(list.toString());


}

public Flux<String> generateTask(ExecutorService executorService, int i) {
    return Flux.<String>create(s -> {
        Util.delay(2000);
        s.onNext(i + "-test");
        s.onComplete();
    }).subscribeOn(executorService);
}

It more or less maps one to one. A small difference is in the Mono type, I personally felt that this type was a nice introduction to the reactive library as it makes it very clear whether more than 1 item is being emitted vs only a single item which I have made use of in the sample.

These are still early explorations for me and I look forward to getting far more familiar with this excellent library.

Monday, April 18, 2016

First steps to Spring Boot Cassandra

If you want to start using Cassandra NoSQL database with Spring Boot, the best resource is likely the Cassandra samples available here and the Spring data Cassandra documentation.

Here I will take a little more roundabout way, by actually installing Cassandra locally and running a basic test against it and I aim to develop this sample into a more comprehensive example with the next blog post.

Setting up a local Cassandra instance

Your mileage may vary, but the simplest way to get a local install of Cassandra running is to use the Cassandra cluster manager(ccm) utility, available here.

ccm create test -v 2.2.5 -n 3 -s

Or a more traditional approach may simply be to download it from the Apache site. If you are following along, the version of Cassandra that worked best for me is the 2.2.5 one.

With either of the above, start up Cassandra, using ccm:

ccm start test

or with the download from the Apache site:

bin/cassandra -f

The -f flag will keep the process in the foreground, this way stopping the process will be very easy once you are done with the samples.

Now connect to this Cassandra instance:

bin/cqlsh

and create a sample Cassandra keyspace:

CREATE KEYSPACE IF NOT EXISTS sample WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};

Using Spring Boot Cassandra


Along the lines of anything Spring Boot related, there is a starter available for pulling in all the relevant dependencies of Cassandra, specified as a gradle dependency here:

compile('org.springframework.boot:spring-boot-starter-data-cassandra')

This will pull in the dependencies that trigger the Auto-configuration for Cassandra related instances - a Cassandra session mainly.

For the sample I have defined an entity called the Hotel defined the following way:

package cass.domain;

import org.springframework.data.cassandra.mapping.PrimaryKey;
import org.springframework.data.cassandra.mapping.Table;

import java.io.Serializable;
import java.util.UUID;

@Table("hotels")
public class Hotel implements Serializable {

    private static final long serialVersionUID = 1L;

    @PrimaryKey
    private UUID id;

    private String name;

    private String address;

    private String zip;

    private Integer version;

    public Hotel() {
    }

    public Hotel(String name) {
        this.name = name;
    }

    public UUID getId() {
        return id;
    }

    public String getName() {
        return this.name;
    }

    public String getAddress() {
        return this.address;
    }

    public String getZip() {
        return this.zip;
    }

    public void setId(UUID id) {
        this.id = id;
    }

    public void setName(String name) {
        this.name = name;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public void setZip(String zip) {
        this.zip = zip;
    }

    public Integer getVersion() {
        return version;
    }

    public void setVersion(Integer version) {
        this.version = version;
    }

}

and the Spring data repository to manage this entity:

import cass.domain.Hotel;
import org.springframework.data.repository.CrudRepository;

import java.util.UUID;

public interface HotelRepository extends CrudRepository<Hotel, UUID>{}


A corresponding cql table is required to hold this entity:

CREATE TABLE IF NOT EXISTS  sample.hotels (
    id UUID,
    name varchar,
    address varchar,
    zip varchar,
    version int,
    primary key((id))
);


That is essentially it, Spring data support for Cassandra would now manage all the CRUD operations of this entity and a test looks like this:

import cass.domain.Hotel;
import cass.repository.HotelRepository;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.UUID;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = SampleCassandraApplication.class)
public class SampleCassandraApplicationTest {

 @Autowired
 private HotelRepository hotelRepository;

 @Test
 public void repositoryCrudOperations() {
  Hotel sample = sampleHotel();
  this.hotelRepository.save(sample);

  Hotel savedHotel = this.hotelRepository.findOne(sample.getId());

  assertThat(savedHotel.getName(), equalTo("Sample Hotel"));

  this.hotelRepository.delete(savedHotel);
 }

 private Hotel sampleHotel() {
  Hotel hotel = new Hotel();
  hotel.setId(UUID.randomUUID());
  hotel.setName("Sample Hotel");
  hotel.setAddress("Sample Address");
  hotel.setZip("8764");
  return hotel;
 }

}

Here is the github repo with this sample. There is not much to this sample yet, in the next blog post I will enhance this sample to account for the fact that it is very important to understand the distribution of data across a cluster in a NoSQL system and how the entity like Hotel here can be modeled for efficient CRUD operations.