Tuesday, December 27, 2016

Practical Reactor operations - Retrieve Details of a Cloud Foundry Application

CF-Java-Client is a library which enables programatic access to a Cloud Foundry Cloud Controller API. It is built on top of Project Reactor, an implementation of Reactive Streams specification and it is a fun exercise using this library to do something practical in a Cloud Foundry environment.

Consider a sample use case - Given an application id I need to find a little more detail of this application, more details of the application along with the details of the organization and the space that it belongs to.

To start with, the basis of all API operations with cf-java-client is a type unsurprisingly called the CloudFoundryClient(org.cloudfoundry.client.CloudFoundryClient), cf-java-client's github page has details on how to get hold of an instance of this type.

Given a CloudFoundryClient instance, the details of an application given its id can be obtained as follows:

Mono<GetApplicationResponse> applicationResponseMono = this.cloudFoundryClient
  .applicationsV2().get(GetApplicationRequest.builder().applicationId(applicationId).build());

Note that the API returns a reactor "Mono" type, this is in general the behavior of all the API calls of cf-java-client.


  • If an API returns one item then typically a Mono type is returned
  • If the API is expected to return more than one item then a Flux type is returned, and
  • If the API is called purely for side effects - say printing some information then it returns a Mono<Void> type


The next step is to retrieve the space identifier from the response and make an API call to retrieve the details of the space and looks like this:

Mono<Tuple2<GetApplicationResponse, GetSpaceResponse>> appAndSpaceMono = applicationResponseMono
  .and(appResponse -> this.cloudFoundryClient.spaces()
    .get(GetSpaceRequest.builder()
      .spaceId(appResponse.getEntity().getSpaceId()).build()));



Here I am using an "and" operator to combine the application response with another Mono that returns the space information, the result is a "Tuple2" type holding both the pieces of information - the application detail and the detail of the space that it is in.

Finally to retrieve the Organization that the app is deployed in:

Mono<Tuple3<GetApplicationResponse, GetSpaceResponse, GetOrganizationResponse>> t3 =
  appAndSpaceMono.then(tup2 -> this.cloudFoundryClient.organizations()
      .get(GetOrganizationRequest.builder()
        .organizationId(tup2.getT2().getEntity()
          .getOrganizationId())
        .build())
      .map(orgResp -> Tuples.of(tup2.getT1(), tup2.getT2(),
        orgResp)));

Here a "then" operation is being used to retrieve the organization detail given the id from the previous step and the result added onto the previous tuple to create a Tuple3 type holding the "Application Detail", "Space Detail" and the "Organization Detail". "then" is the equivalent of flatMap operator familiar in the Scala and ReactiveX world.

This essentially covers the way you would typically deal with "cf-java-client" library and use the fact that it is built on the excellent "Reactor" library and its collection of very useful operators to get results together. Just to take the final step of transforming the result to a type that may be more relevant to your domain and to handle any errors along the way:

Mono<AppDetail> appDetail =  
 t3.map(tup3 -> {
   String appName = tup3.getT1().getEntity().getName();
   String spaceName = tup3.getT2().getEntity().getName();
   String orgName = tup3.getT3().getEntity().getName();
   return new AppDetail(appName, orgName, spaceName);
  }).otherwiseReturn(new AppDetail("", "", ""));


If you are interested in trying out a working sample, I have an example available in my github repo here - https://github.com/bijukunjummen/boot-firehose-to-syslog

And the code shown in the article is available here - https://github.com/bijukunjummen/boot-firehose-to-syslog/blob/master/src/main/java/io.pivotal.cf.nozzle/service/CfAppDetailsService.java


Sunday, December 18, 2016

Spring Boot and Application Context Hierarchy

Spring Boot supports a simple way of specifying a Spring application context hierarchy.

This post is simply demonstrating this feature, I am yet to find a good use of it in the projects I have worked on. Spring Cloud uses this feature for creating a bootstrap context where properties are loaded up, if required, from an external configuration server which is made available to the main application context later on.

To quickly take a step back - a Spring Application Context manages the lifecycle of all the beans registered with it. Application Context hierarchies provide a way to reuse beans, beans defined in the parent context is accessible in the child contexts.

Consider a contrived use-case of using multiple application contexts and application context hierarchy - this is to provide two different ports with different set of endpoints at each of these ports.


Child1 and Child2 are typical Spring Boot Applications, along these lines:

package child1;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
import root.RootBean;

@SpringBootApplication
@PropertySource("classpath:/child1.properties")
public class ChildContext1 {

    @Bean
    public ChildBean1 childBean(RootBean rootBean, @Value("${root.property}") String someProperty) {
        return new ChildBean1(rootBean, someProperty);
    }
}


Each of the application resides in its own root package to avoid collisions when scanning for beans. Note that the bean in the child contexts depend on a bean that is expected to come from the root context.

The port to listen on is provided as properties, since the two contexts are expected to listen on different ports I have explicitly specified the property file to load with a content along these lines:

server.port=8080
spring.application.name=child1

Given this set-up, Spring Boot provides a fluid interface to load up the root context and the two child contexts:

SpringApplicationBuilder appBuilder =
       new SpringApplicationBuilder()
               .parent(RootContext.class)
               .child(ChildContext1.class)
               .sibling(ChildContext2.class);

ConfigurableApplicationContext applicationContext  = appBuilder.run();

The application context returned by the SpringBootApplicationBuilder appears to be the final one in the chain, defined via ChildContext2 above.

If the application is now started up, there would be a root context with two different child contexts each exposing an endpoint via a different port. A visualization via the /beans actuator endpoint shows this:


Not everything is clean though, there are errors displayed in the console related to exporting jmx endpoints, however these are informational and don't appear to affect the start-up.

Samples are available in my github repo

Tuesday, November 29, 2016

Using Kafka with Junit

One of the neat features that the excellent Spring Kafka project provides, apart from a easier to use abstraction over raw Kafka Producer and Consumer, is a way to use Kafka in tests. It does this by providing an embedded version of Kafka that can be set-up and torn down very easily.

All that a project needs to include this support is the "spring-kafka-test" module, for a gradle build the following way:

testCompile "org.springframework.kafka:spring-kafka-test:1.1.2.BUILD-SNAPSHOT"

Note that I am using a snapshot version of the project as this has support for Kafka 0.10+.

With this dependency in place, an Embedded Kafka can be spun up in a test using the @ClassRule of JUnit:

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(2, true, 2, "messages");

This would start up a Kafka Cluster with 2 brokers, with a topic called "messages" using 2 partitions and the class rule would make sure that a Kafka cluster is spun up before the tests are run and then shutdown at the end of it.

Here is how a sample with Raw Kafka Producer/Consumer using this embedded Kafka cluster looks like, the embedded Kafka can be used for retrieving the properties required by the Kafka Producer/Consumer:

Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>("messages", 0, 0, "message0")).get();
producer.send(new ProducerRecord<>("messages", 0, 1, "message1")).get();
producer.send(new ProducerRecord<>("messages", 1, 2, "message2")).get();
producer.send(new ProducerRecord<>("messages", 1, 3, "message3")).get();


Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka);
consumerProps.put("auto.offset.reset", "earliest");

final CountDownLatch latch = new CountDownLatch(4);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
    KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
    kafkaConsumer.subscribe(Collections.singletonList("messages"));
    try {
        while (true) {
            ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<Integer, String> record : records) {
                LOGGER.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
                latch.countDown();
            }
        }
    } finally {
        kafkaConsumer.close();
    }
});

assertThat(latch.await(90, TimeUnit.SECONDS)).isTrue();

A little more comprehensive test is available here

Tuesday, November 22, 2016

Recipe for getting started with Spring Boot and Angular 2

I am primarily a service developer who has to create some passable UI's once in a while. I was adept at basic AngularJS1 based UI's and could get stuff done by using an approach that I have outlined before. With the advent of Angular 2 I had to unfortunately throw my previous approach out of the window and now have an approach with Spring Boot/ Angular 2 that works equally well.

The approach essentially works on the fact that a Spring Boot web application looks for static content in a very specific location - src/main/resources/static folder from the root of the project, so if I can get the final js content into this folder, then I am golden.

So let us jump into it.

Pre-requisites

There is primarily one pre-requisite - the excellent angular-cli tool which is a blessing for UI ignorant developers like me.

The second optional but useful pre-requisite is the Spring-Boot CLI tool described here


Generating a SPA Project


Given these two tools, first create a Spring Boot web project either by starting from http://start.spring.io or using the following CLI command:

spring init --dependencies=web spring-boot-angular2-static-sample

At this point a starter project should have been generated in the spring-boot-angular2-static-sample folder. From that folder generate a Angular 2 project using the angular-cli.

ng init

Change the location where angular-cli builds the artifacts, edit angular-cli.json and modify as follows:




Now build the static content:

ng build

this should get the static content to the src/main/resources/static folder.

And start up the Spring-Boot app:

mvn spring-boot:run

and the AngularJS2 based UI should render cleanly!

Live Reload

One of the advantages of using the Angular-cli is the excellent tool-chain that it comes with - one of them being the ability to make changes and view it reflected on the UI. This ability is lost with the approach documented here where the UI may be primarily driven by services hosted on the Spring-Boot project. To get back the live reload feature on the AngularJS2 development is however a cinch.

First proxy the backend, create a proxy.conf.json file with entry which looks like this:

{
  "/api": {
    "target": "http://localhost:8080",
    "secure": false
  }
}

and start up the Angular-cli server using the command:

ng serve --proxy-config proxy.conf.json

and start up the server part independently using:

mvn spring-boot:run

That is it, now the UI development can be carried out independent of the server side API's!. For an even greater punch just use the excellent devtools that is packaged with Spring Boot to get a live reload(more a restart) feature on the server side also.

Conclusion

This is the recipe I use for any basic UI that I may have to create, this approach probably is not ideal for large projects but should be a perfect fit for small internal projects. I have a sample starter with a backend call hooked up available in my github repo here.

Sunday, November 13, 2016

Spring Kafka Producer/Consumer sample

My objective here is to show how Spring Kafka provides an abstraction to raw Kafka Producer and Consumer API's that is easy to use and is familiar to someone with a Spring background.

Sample scenario


The sample scenario is a simple one, I have a system which produces a message and another which processes it


Implementation using Raw Kafka Producer/Consumer API's

To start with I have used raw Kafka Producer and Consumer API's to implement this scenario. If you would rather look at the code, I have it available in my github repo here.

Producer

The following sets up a KafkaProducer instance which is used for sending a message to a Kafka topic:

KafkaProducer<String, WorkUnit> producer 
    = new KafkaProducer<>(kafkaProps, stringKeySerializer(), workUnitJsonSerializer());

I have used a variation of the KafkaProducer constructor which takes in a custom Serializer to convert the domain object to a json representation.

Once an instance of KafkaProducer is available, it can be used for sending a message to the Kafka cluster, here I have used a synchronous version of the sender which waits for a response to be back.

ProducerRecord<String, WorkUnit> record 
                = new ProducerRecord<>("workunits", workUnit.getId(), workUnit);

RecordMetadata recordMetadata = this.workUnitProducer.send(record).get();

Consumer

On the Consumer side we create a KafkaConsumer with a variation of the constructor taking in a Deserializer which knows how to read a json message and translate that to the domain instance:

KafkaConsumer<String, WorkUnit> consumer  
    = new KafkaConsumer<>(props, stringKeyDeserializer(), workUnitJsonValueDeserializer());

Once an instance of KafkaConsumer is available a listener loop can be put in place which reads a batch of records, processes them and waits for more records to come through:

consumer.subscribe("workunits);

try {
    while (true) {
        ConsumerRecords<String, WorkUnit> records = this.consumer.poll(100);
        for (ConsumerRecord<String, WorkUnit> record : records) {
            log.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                    record.topic(), record.partition(), record.offset(), record.key(), record.value());

        }
    }
} finally {
    this.consumer.close();
}


Implementation using Spring Kafka 


I have the implementation using Spring-kafka available in my github repo.

Producer

Spring-Kafka provides a KafkaTemplate class as a wrapper over the KafkaProducer to send messages to a Kafka topic:

@Bean
public ProducerFactory<String, WorkUnit> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), stringKeySerializer(), workUnitJsonSerializer());
}

@Bean
public KafkaTemplate<String, WorkUnit> workUnitsKafkaTemplate() {
    KafkaTemplate<String, WorkUnit> kafkaTemplate =  new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setDefaultTopic("workunits");
    return kafkaTemplate;
}

One thing to note is that whereas earlier I had implemented a custom Serializer/Deserializer to send a domain type as json and then to convert it back, Spring-Kafka provides Seralizer/Deserializer for json out of the box.

And using KafkaTemplate to send a message:

SendResult<String, WorkUnit> sendResult = 
    workUnitsKafkaTemplate.sendDefault(workUnit.getId(), workUnit).get();

RecordMetadata recordMetadata = sendResult.getRecordMetadata();

LOGGER.info("topic = {}, partition = {}, offset = {}, workUnit = {}",
        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), workUnit);

Consumer

The consumer part is implemented using a Listener pattern that should be familiar to anybody who has implemented listeners for RabbitMQ/ActiveMQ. Here is first the configuration to set-up a listener container:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, WorkUnit> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, WorkUnit> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(1);
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public ConsumerFactory<String, WorkUnit> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer());
}



and the service which responds to messages read by the container:

@Service
public class WorkUnitsConsumer {
    private static final Logger log = LoggerFactory.getLogger(WorkUnitsConsumer.class);

    @KafkaListener(topics = "workunits")
    public void onReceiving(WorkUnit workUnit, @Header(KafkaHeaders.OFFSET) Integer offset,
                            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        log.info("Processing topic = {}, partition = {}, offset = {}, workUnit = {}",
                topic, partition, offset, workUnit);
    }
}

Here all the complexities of setting up a listener loop like with the raw consumer is avoided and is nicely hidden by the listener container.


Conclusion

I have brushed over a lot of the internals of setting up batch sizes, variations in acknowledgement, different API signatures. My intention is just to demonstrate a common use case using the raw Kafka API's and show how Spring-Kafka wrapper simplifies it.

If you are interested in exploring further, the raw producer consumer sample is available here and the Spring Kafka one here

Saturday, November 5, 2016

Parallelizing Hystrix calls

This is more common sense than anything else. If you make calls to multiple remote systems and aggregate the results in some way, represented as a marble diagram here:



And you protect each of the remote calls using the awesome Hystrix libraries, then the best way to aggregate the results is using native rx-java operators.

So consider a Hystrix command, assume that such a command in reality would wrap around a remote call:

public class SampleRemoteCallCommand1 extends HystrixCommand<String> {

    public SampleRemoteCallCommand1() {
        super(Setter.withGroupKey(
                HystrixCommandGroupKey.Factory.asKey("sample1"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("sample1")
                )
        );
    }

    @Override
    protected String run() throws Exception {
        DelayUtil.delay(700);
        return "something";
    }

    @Override
    protected String getFallback() {
        return "error";
    }
}


a service which would aggregate responses from multiple such remote calls together would look like this:

SampleRemoteCallCommand1 command1 = new SampleRemoteCallCommand1();
SampleRemoteCallCommand2 command2 = new SampleRemoteCallCommand2();

Observable<String> result1Obs = command1.toObservable();
Observable<Integer> result2Obs = command2.toObservable();

Observable<String> result =
        Observable.zip(result1Obs, result2Obs, (result1, result2) -> result1 + result2);
        

Essentially instead of synchronously executing the Hystrix command, we just use the "toObservable()" method to return a Rx-Java Observable representation of the result and use the different ways that Observable provides to aggregate the results together, in this specific instance the zip operator.

The main advantage of this approach is that we re-use the hystrix threadpool that each command uses to run the tasks in parallel. Here is a sample project which demonstrates this - https://github.com/bijukunjummen/sample-hystrix-parallel

Just a note of caution - if your Hystrix command does not have a fallback and if you use this approach with one of the remote calls failing, you may see a memory leak in your app - I had opened an issue regarding this leak, which the excellent Netflix Team has already addressed.

Friday, October 21, 2016

Tracing Spring Integration Flow with Spring Cloud Sleuth

Spring Cloud Sleuth is an awesome project that provides a way to trace requests that span multiple systems. Spring Cloud sleuth can optionally export this trace data to Zipkin where it can be visualized in a neat way. I especially love the fact that Spring Cloud Sleuth integrates deeply with Spring Integration and can nicely trace out the flow of a message.


Consider the following -



I have two different systems here - a work dispatcher producing "Work Unit"s and a Work Handler consuming them. They talk over a RabbitMQ broker. Just to mix the flow up a bit, I also have a retry mechanism in place which retries the message every 20 seconds in case of a processing failure



Both these systems are described using Spring Integration Java DSL, the outbound flow dispatching the WorkUnits looks like this:

@Configuration
public class WorksOutbound {

    @Autowired
    private RabbitConfig rabbitConfig;

    @Bean
    public IntegrationFlow toOutboundQueueFlow() {
        return IntegrationFlows.from("worksChannel")
                .transform(Transformers.toJson())
                .log()
                .handle(Amqp.outboundGateway(rabbitConfig.worksRabbitTemplate()))
                .transform(Transformers.fromJson(WorkUnitResponse.class))
                .get();
    }

    @Bean
    public IntegrationFlow handleErrors() {
        return IntegrationFlows.from("errorChannel")
                .transform((MessagingException e) -> e.getFailedMessage().getPayload())
                .transform(Transformers.fromJson(WorkUnit.class))
                .transform((WorkUnit failedWorkUnit) -> new WorkUnitResponse(failedWorkUnit.getId(), failedWorkUnit.getDefinition(), false))
                .get();
    }

}

This is eminently readable - the "Work Unit" comes through a "works channel" and is dispatched to a RabbitMQ queue after tranforming to json. Note that the dispatch is via an outbound gateway, this means that the Spring integration would put the necessary infrastructure in place to wait for a reply to be back from the remote system. In case of an error, say if the reply does not appear in time, a stock response is provided back to the user.

On the Work Handler side a similar flow handles the message:

@Configuration
public class WorkInbound {

    @Autowired
    private RabbitConfig rabbitConfig;

    @Bean
    public IntegrationFlow inboundFlow() {
        return IntegrationFlows.from(
                Amqp.inboundGateway(rabbitConfig.workListenerContainer()))
                .transform(Transformers.fromJson(WorkUnit.class))
                .log()
                .filter("(headers['x-death'] != null) ? headers['x-death'][0].count < 3: true", f -> f.discardChannel("nullChannel"))
                .handle("workHandler", "process")
                .transform(Transformers.toJson())
                .get();
    }

}

The only wrinkle in this flow is the retry logic which discards the message after 3 retries. If you are interested in the details of how the retry is being hooked up, I have more details here.


So now, given this fairly involved flow, here is how Spring Cloud Sleuth with Zipkin integrated looks like:



Spring Cloud Sleuth intercepts every message channel and tags the message as it flows through the channel.


Now for something a little more interesting, if the flow were more complex with 3 retries each 20 seconds apart, again the flow is beautifully brought out by Spring Cloud Sleuth and its integration with Zipkin.


Conclusion


If you maintain a Spring Integration based flow, Spring Cloud Sleuth is an addition to the project and can trace the runtime path of a message and show it visually using the Zipkin UI. I look forward to exploring more of the nuances of this excellent project.


The sample that I have demonstrated here is available in my github repo - https://github.com/bijukunjummen/si-with-sleuth-sample

Thursday, September 29, 2016

Spring-Reactive samples - Mono and Single

This is just a little bit of a learning from my previous post where I had tried out the Spring's native support for  reactive programming.

Just to quickly recap, my objective was to develop a reactive service which takes in a request which looks like this:

{
 "id":1,
  "delay_by": 2000,
  "payload": "Hello",
  "throw_exception": false
}

and returns a response along these lines:

{
  "id": "1",
  "received": "Hello",
  "payload": "Response Message"
}

I had demonstrated this in two ways that the upcoming Spring's reactive model supports, using the Reactor-Core Flux type as a return type and using Rx-java Observable type

However the catch with these types is that the response would look something like this:

[{"id":"1","received":"Hello","payload":"From RxJavaService"}]

Essentially an array, and the reason is obvious - Flux and Observable represent zero or more asynchronous emissions and so Spring Reactive Web framework has to represent such a result as an array.

The fix to return the expected json is to essentially return a type which represents 1 value - such a type is the Mono in Reactor-Core OR a Single in Rx-java. Both these types are as capable as their multi-valued counterparts in providing functions which combine, transform their elements.

So with this change the controller signature with Mono looks like this:

@RequestMapping(path = "/handleMessageReactor", method = RequestMethod.POST)
public Mono<MessageAcknowledgement> handleMessage(@RequestBody Message message) {
  return this.aService.handleMessageMono(message);
}


and with Single like this:

@RequestMapping(path = "/handleMessageRxJava", method = RequestMethod.POST)
public Single<MessageAcknowledgement> handleMessage(@RequestBody Message message) {
  return this.aService.handleMessageSingle(message);
}

I have the sample code available in my github repo

Saturday, September 10, 2016

RabbitMQ retries using Spring Integration

I recently read about an approach to retry with RabbitMQ here and wanted to try a similar approach with Spring Integration, which provides an awesome set of integration abstractions.

TL;DR the problem being solved is to retry a message(in case of failures in processing) a few times with a large delay between retries(say 10 mins +). The approach makes use of the RabbitMQ support for Dead Letter Exchanges and looks something like this




The gist of the flow is :
1. A Work dispatcher creates "Work Unit"(s) and sends it to a RabbitMQ queue via an exchange.
2. The Work queue is set with a Dead Letter exchange. If the message processing fails for any reason the "Work Unit" ends up with the Work Unit Dead Letter Queue.
3. Work Unit Dead Letter queue is in-turn set with the Work Unit exchange as the Dead Letter Exchange, this way creating a cycle. Further, the expiration of messages in the dead letter queue is set to say 10 mins, this way once the message expires it will be back again in the Work unit queue.
4. To break the cycle the processing code has to stop processing once a certain count threshold is exceeded.



Implementation using Spring Integration


I have covered a straight happy path flow using Spring Integration and RabbitMQ before, here I will mostly be building on top of this code.

A good part of the set-up is the configuration of the appropriate dead letter exchanges/queues, and looks like this when expressed using Spring's Java Configuration:

@Configuration
public class RabbitConfig {

    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Bean
    Exchange worksExchange() {
        return ExchangeBuilder.topicExchange("work.exchange")
                .durable()
                .build();
    }


    @Bean
    public Queue worksQueue() {
        return QueueBuilder.durable("work.queue")
                .withArgument("x-dead-letter-exchange", worksDlExchange().getName())
                .build();
    }

    @Bean
    Binding worksBinding() {
        return BindingBuilder
                .bind(worksQueue())
                .to(worksExchange()).with("#").noargs();
    }
    
    // Dead letter exchange for holding rejected work units..
    @Bean
    Exchange worksDlExchange() {
        return ExchangeBuilder
                .topicExchange("work.exchange.dl")
                .durable()
                .build();
    }

    //Queue to hold Deadletter messages from worksQueue
    @Bean
    public Queue worksDLQueue() {
        return QueueBuilder
                .durable("works.queue.dl")
                .withArgument("x-message-ttl", 20000)
                .withArgument("x-dead-letter-exchange", worksExchange().getName())
                .build();
    }

    @Bean
    Binding worksDlBinding() {
        return BindingBuilder
                .bind(worksDLQueue())
                .to(worksDlExchange()).with("#")
                .noargs();
    }
    ...
}


Note that here I have set the TTL of the Dead Letter queue to 20 seconds, this means that after 20 seconds a failed message will be back in the processing queue. Once this set-up is in place and the appropriate structures are created in RabbitMQ, the consuming part of the code looks like this, expressed using Spring Integration Java DSL:

@Configuration
public class WorkInbound {

    @Autowired
    private RabbitConfig rabbitConfig;

    @Bean
    public IntegrationFlow inboundFlow() {
        return IntegrationFlows.from(
                Amqp.inboundAdapter(rabbitConfig.workListenerContainer()))
                .transform(Transformers.fromJson(WorkUnit.class))
                .log()
                .filter("(headers['x-death'] != null) ? headers['x-death'][0].count <= 3: true", f -> f.discardChannel("nullChannel"))
                .handle("workHandler", "process")
                .get();
    }

}

Most of the retry logic here is handled by the RabbitMQ infrastructure, the only change here is to break the cycle by explicitly discarding the message after a certain 2 retries. This break is expressed as a filter above, looking at the header called "x-death" that RabbitMQ adds to the message once it is sent to Dead Letter exchange. The filter is admittedly a little ugly - it can likely be expressed a little better in Java code.



One more thing to note is that the retry logic could have been expressed in-process using Spring Integration, however I wanted to investigate a flow where the retry times can be high (say 15 to 20 mins) which will not work well in-process and is also not cluster safe as I want any instances of an application to potentially handle the retry of a message.

If you want to explore further, do try the sample at my github repo - https://github.com/bijukunjummen/si-dsl-rabbit-sample

Reference:

Retry With RabbitMQ: http://dev.venntro.com/2014/07/back-off-and-retry-with-rabbitmq

Wednesday, August 24, 2016

Integrating with RabbitMQ using Spring Cloud Stream

In my previous post I wrote about a very simple integration scenario between two systems - one generating a work unit and another processing that work unit and how Spring Integration makes such integration very easy.



Here I will demonstrate how this integration scenario can be simplified even further using Spring Cloud Stream

I have the sample code available here - the right maven dependencies for Spring Cloud Stream is available in the pom.xml.

Producer


So again starting with the producer responsible for generating the work units. All that needs to be done code wise to send messages to RabbitMQ is to have a java configuration along these lines:

@Configuration
@EnableBinding(WorkUnitsSource.class)
@IntegrationComponentScan
public class IntegrationConfiguration {}

This looks deceptively simple but does a lot under the covers, from what I can understand and glean from the documentation these are what this configuration triggers:

1. Spring Integration message channels based on the classes that are bound to the @EnableBinding annotation are created. The WorkUnitsSource class above is the definition of a custom channel called "worksChannel" and looks like this:

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface WorkUnitsSource {

    String CHANNEL_NAME = "worksChannel";

    @Output
    MessageChannel worksChannel();

}

2. Based on which "binder" implementation is available at runtime(say RabbitMQ, Kaffka, Redis, Gemfire), the channel in the previous step will be connected to the appropriate structures in the system - so for eg, I am want my "worksChannel" to in turn send messages to RabbitMQ, Spring Cloud Stream would take care of automatically creating a topic exchange in RabbitMQ

I wanted some further customizations in terms of how the data is sent to RabbitMQ - specifically I wanted my domain objects to be serialized to json before being sent across and I want to specify the name of the RabbitMQ exchange that the payload is sent to, this is controlled by certain configurations that can be attached to the channel the following way using a yaml file:

spring:
  cloud:
    stream:
      bindings:
        worksChannel:
          destination: work.exchange
          contentType: application/json
          group: testgroup

One final detail is a way for the rest of the application to interact with Spring Cloud Stream, this can be done directly in Spring Integration by defining a message gateway:

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import works.service.domain.WorkUnit;

@MessagingGateway
public interface WorkUnitGateway {
 @Gateway(requestChannel = WorkUnitsSource.CHANNEL_NAME)
 void generate(WorkUnit workUnit);

}

That is essentially it, Spring Cloud Stream would now wire up the entire Spring integration flow, create the appropriate structures in RabbitMQ.


Consumer


Similar to the Producer, first I want to define the channel called "worksChannel" which would handle the incoming message from RabbitMQ:

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface WorkUnitsSink {
    String CHANNEL_NAME = "worksChannel";

    @Input
    SubscribableChannel worksChannel();
}

and let Spring Cloud Stream create the channels and RabbitMQ bindings based on this definition:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBinding(WorkUnitsSink.class)
public class IntegrationConfiguration {}

To process the messages, Spring Cloud Stream provides a listener which can be created the following way:

@Service
public class WorkHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(WorkHandler.class);

    @StreamListener(WorkUnitsSink.CHANNEL_NAME)
    public void process(WorkUnit workUnit) {
        LOGGER.info("Handling work unit - id: {}, definition: {}", workUnit.getId(), workUnit.getDefinition());
    }
}

And finally the configuration which connects this channel to the RabbitMQ infrastructure expressed in a yaml file:

spring:
  cloud:
    stream:
      bindings:
        worksChannel:
          destination: work.exchange
          group: testgroup


Now if the producer and any number of consumers were started up, the message sent via the producer would be sent to a Rabbit MQ topic exchange as a json, retrieved by the consumer, deserialized to an object and passed to the work processor.

A good amount of the boiler plate involved in creating the RabbitMQ infrastructure is now handled purely by convention by the Spring Cloud Stream libraries. Though Spring Cloud Stream attempts to provide a facade over the raw Spring Integration, it is useful to have a basic knowledge of Spring integration to use Spring Cloud Stream effectively.

The sample described here is available at my github repository

Monday, August 22, 2016

Integrating with Rabbit MQ using Spring Integration Java DSL

I recently attended the Spring One conference 2016 in Las Vegas and had the good fortune to see from near and far some of the people that I have admired for a long time in the Software World. I personally met two of them who have actually merged some of my Spring Integration related minor contributions from a few years ago - Gary Russel and Artem Bilan and they inspired me to look again at Spring Integration which I have not used for a while.

I was once more reminded of how Spring Integration makes any complex Enterprise integration scenario look easy. I am happy to see that Spring Integration Java based DSL is now fully integrated into the Spring Integration umbrella and higher level abstractions like Spring Cloud Stream(introductions thanks to my good friend and a contributor to this project Soby Chacko) which makes some of the message driven scenarios even easier.

In this post I am just revisiting a very simple integration scenario with RabbitMQ and in a later post will re-implement it using Spring Cloud Stream.

Consider a scenario where two services are talking to each other via a RabbitMQ broker in between, one of them generating some kind of a work, the other processing this work.



Producer


The Work unit producing/dispatching part can be expressed in code using Spring Integration Java DSL the following way:


@Configuration
public class WorksOutbound {

    @Autowired
    private RabbitConfig rabbitConfig;

    @Bean
    public IntegrationFlow toOutboundQueueFlow() {
        return IntegrationFlows.from("worksChannel")
                .transform(Transformers.toJson())
                .handle(Amqp.outboundAdapter(rabbitConfig.worksRabbitTemplate()))
                .get();
    }
}

This is eminently readable - the flow starts by reading a message off a channel called "worksChannel", transforms the message into a json and dispatches it off using an Outbound channel adapter to a RabbitMQ exchange. Now, how does the message get to the channel called "worksChannel" - I have configured it via a Messaging gateway, an entry point to the Spring Integration world -

@MessagingGateway
public interface WorkUnitGateway {
 @Gateway(requestChannel = "worksChannel")
 void generate(WorkUnit workUnit);

}

So now if a java client wanted to dispatch a "work unit" to rabbitmq, the call would look like this :

WorkUnit sampleWorkUnit = new WorkUnit(UUID.randomUUID().toString(), definition);
workUnitGateway.generate(sampleWorkUnit);

I have brushed over a few things here - specifically the Rabbit MQ configuration, that is run of the mill however and is available here

Consumer

Along the lines of a producer, a consumers flow would start by receiving a message from RabbitMQ queue, transforming it to a domain model and then processing the message, expressed using Spring Integration Java DSL the following way:

@Configuration
public class WorkInbound {

    @Autowired
    private RabbitConfig rabbitConfig;

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public IntegrationFlow inboundFlow() {
        return IntegrationFlows.from(
                Amqp.inboundAdapter(connectionFactory, rabbitConfig.worksQueue()).concurrentConsumers(3))
                .transform(Transformers.fromJson(WorkUnit.class))
                .handle("workHandler", "process")
                .get();
    }
}

The code should be intuitive, the workHandler above is a simple Java pojo and looks like this, doing the very important job of just logging the payload:

@Service
public class WorkHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(WorkHandler.class);

    public void process(WorkUnit workUnit) {
        LOGGER.info("Handling work unit - id: {}, definition: {}", workUnit.getId(), workUnit.getDefinition());
    }
}


That is essentially it, Spring Integration provides an awesome facade to what would have been a fairly complicated code had it been attempted using straight Java and raw RabbitMQ libraries. Spring Cloud Stream makes this entire set-up even simpler and would be the topic of a future post.


I have posted this entire code at my github repo if you are interested in taking this for a spin.

Saturday, August 6, 2016

No downtime deployment using "Yet another" Cloud Foundry Gradle plugin

I have been trying my hand at writing a gradle plugin for deploying applications to Cloud Foundry and wrote about this plugin in my previous post. I have now enhanced this plugin with support for no-downtime deploys into Cloud Foundry using two approaches - an Autopilot style deployment and a more commonly used Blue-Green style deployment.

To jump into the meat of the plugin, once it is configured cleanly all you have to do is the following:

For an autopilot style

./gradlew cf-push-autopilot

and for a Blue-Green deployment:

./gradlew cf-push-blue-green

and the plugin tasks would take care of the rest.

What is being solved


If you use Cloud Foundry CLI to push an application to Cloud Foundry, then existing instances of the application is stopped, replaced and started up. This introduces a downtime for the application until the new instance of the application is up. Just to demonstrate this behavior, the following graph represents a steady traffic to a website while an application is pushed to Cloud Foundry - the 30 second blip is when the new app is being started up.


Autopilot and Blue-Green style deployments


Autopilot and Blue-Green styles of deployment fix the issue by carefully orchestrating the deployment of an application such that the external facing route always points to a working version of the application.

The plugin now natively performs all the steps needed for these two styles of no-downtime deployments.

Here is how the same graph looks with an Autopilot style type deployment using the plugin, note that there is a slightly higher response time around the time the new application switches in. Once primed though the response times smooth out:



and with a Blue-Green style deployment using this plugin


References:


1. The details about how to install and configure the plugin is available here - https://github.com/pivotalservices/ya-cf-app-gradle-plugin

2. A sample application configured with the plugin is here - https://github.com/bijukunjummen/cf-show-env

3. The load test using gatling is available here

Tuesday, July 19, 2016

Introducing "Yet another" Cloud foundry Gradle plugin

In the process of working on an automated Jenkins pipeline for deploying a Cloud Foundry application with two of my colleagues(Thanks Mark Alston, Dave Malone !) I decided to try my hand on writing a Gradle plugin to perform some of the tasks that are typically done using a command line Cloud Foundry Client.

Introducing the totally unimaginatively named "ya-cf-app-gradle-plugin" with a set of gradle tasks(dare I say opinionated!) that should help automate some of the routine steps involved in deploying a java application to a Cloud Foundry environment. The "ya" or the yet-another part is because this is just a stand-in plugin, the authoritative plugin for Cloud Foundry will ultimately reside with the excellent CF-Java-Client project.


I have provided an extensive README with the projects documentation that should help in getting started with using the plugin, the tasks should be fairly intuitive if you have previously worked with the CF cli.

Just as an example, once the gradle plugin is cleanly added into the build script, the following gradle tasks are available when listed by running "./gradlew tasks" command:





All the tasks work off a configuration provided the following way in a cfConfig block in the buildscript:

apply plugin: 'cf-app'

cfConfig {
 //CF Details
 ccHost = "api.local.pcfdev.io"
 ccUser = "admin"
 ccPassword = "admin"
 org = "pcfdev-org"
 space = "pcfdev-space"

 //App Details
 name = "cf-show-env"
 hostName = "cf-show-env"
 filePath = "build/libs/cf-show-env-0.1.2-SNAPSHOT.jar"
 path = ""
 domain = "local.pcfdev.io"
 instances = 2
 memory = 512

 //Env and services
 buildpack = "https://github.com/cloudfoundry/java-buildpack.git"
 environment = ["JAVA_OPTS": "-Djava.security.egd=file:/dev/./urandom", "SPRING_PROFILES_ACTIVE": "cloud"]
 services  = ["mydb"]
}

Any overrides on top of the base configuration provided this way can be done by specifying gradle properties with a "cf.*" pattern. For eg. a normal push of an application would look like this:

./gradlew cf-push

and a push with the name of the application and the host name overridden would look like this:

./gradlew cf-push -Pcf.name=Green -Pcf.hostName=demo-time-temp


All of the tasks follow the exact same pattern, depending on the cfConfig block as the authoritative source of properties along with the command line overrides. There is one task that can be used for retrieving back some of the details of an app in CloudFoundry, the task is "cf-get-app-detail", say after deploying a canary instance of an app you wanted to run a quick test against it, the task would look along these lines, a structure "project.cfConfig" is populated with the app details once successfully invoked:

task acceptanceTest(type: Test, dependsOn: "cf-get-app-detail")  {
 doFirst() {
  systemProperty "url", "https://${project.cfConfig.applicationDetail.urls[0]}"
 }
 useJUnit {
  includeCategories 'test.AcceptanceTest'
 }
}


References:


1. The plugin is built on top of the excellent CF-Java-Client project
2. I have borrowed a lot of ideas from gradle-cf-plugin but is more or less a clean room implementation
3. Here is a sample project which makes use of the plugin.

Tuesday, July 5, 2016

Spring Cloud Zuul - Writing a Filter

Netflix OSS project Zuul serves as a gateway to backend services and provides support for adding in edge features like security, routing. In the Zuul world specific edge features are provided by components called the Zuul Filter and writing such a filter for a Spring Cloud based project is very simple. A good reference to adding a filter is here. Here I wanted to demonstrate two small features - deciding whether a filter should act on a request and secondly to add a header before forwarding the request.

Writing a Zuul Filter


Writing a Zuul Filter is very easy for Spring Cloud, all we need to do is to add a Spring bean which implements the ZuulFilter, so for this example it would look something like this:

import com.netflix.zuul.ZuulFilter;
import com.netflix.zuul.context.RequestContext;
import org.springframework.stereotype.Service;


@Service
public class PayloadTraceFilter extends ZuulFilter {

    private static final String HEADER="payload.trace";

    @Override
    public String filterType() {
        return "pre";
    }

    @Override
    public int filterOrder() {
        return 999;
    }

    @Override
    public boolean shouldFilter() {
      ....   
    }

    @Override
    public Object run() {
     ....
    }
}

Some high level details of this implementation, this has been marked as a "Filter type" of "pre" which means that this filter would be called before the request is dispatched to the backend service, filterOrder determines when this specific filter is called in the chain of filters, should Filter determines if this filter is invoked at all for this request and run contains the logic for the filter.

So to my first consideration, whether this filter should act on the flow at all - this can be done on a request by request basis, my logic is very simple - if the request uri starts with /samplesvc then this filter should act on the request.

@Override
public boolean shouldFilter() {
    RequestContext ctx = RequestContext.getCurrentContext();
    String requestUri = ctx.getRequest().getRequestURI();
    return requestUri.startsWith("/samplesvc");
}

and the second consideration on modifying the request headers to the backend service:

@Override
public Object run() {
    RequestContext ctx = RequestContext.getCurrentContext();
    ctx.addZuulRequestHeader("payload.trace", "true");
    return null;
}

A backing service getting such a request can look for the header and act accordingly, say in this specific case looking at the "payload.trace" header and deciding to log the incoming message:

@RequestMapping(value = "/message", method = RequestMethod.POST)
public Resource<MessageAcknowledgement> pongMessage(@RequestBody Message input, @RequestHeader("payload.trace") boolean tracePayload) {
    if (tracePayload) {
        LOGGER.info("Received Payload: {}", input.getPayload());
    }
....

Conclusion

As demonstrated here, Spring Cloud really makes it simple to add in Zuul filters for any edge needs. If you want to explore this sample a little further I have sample projects available in my github repo.

Wednesday, June 22, 2016

Spring Cloud Zuul Support - Configuring Timeouts

Spring Cloud provides support for Netflix Zuul - a toolkit for creating edge services with routing and filtering capabilities.

Zuul Proxy support is very comprehensively documented at the Spring Cloud site. My objective here is to focus on a small set of attributes relating to handling timeouts when dealing with the proxied services.

Target Service and Gateway


To study timeouts better I have created a sample service(code available here) which takes in a configurable "delay" parameter as part of the request body and a sample request/response looks something like this:

A sample request with a 5 second delay:

{
  "id": "1",
  "payload": "Hello",
  "delay_by": 5000,
  "throw_exception": false
}


and an expected response:

{
  "id": "1",
  "received": "Hello",
  "payload": "Hello!"
}


This service is registered with an id of "sample-svc" in Eureka, a Spring Cloud Zuul proxy on top of this service has the following configuration:

zuul:
  ignoredServices: '*'
  routes:
    samplesvc:
      path: /samplesvc/**
      stripPrefix: true
      serviceId: sample-svc

Essentially forward all requests to /samplesvc/ uri to a service disambiguated with the name "sample-svc" via Eureka.

I also have a UI on top of the gateway to make testing with different delay's easier:


Service Delay Tests


The Gateway behaves without any timeout related issues when a low "delay" parameter is added to the service call, however if the delay parameter is changed as low as say 1 to 1.5 seconds the gateway would time out.

The reason is that if the Gateway is set up to use Eureka, then the Gateway uses Netflix Ribbon component to make the actual call. Further, the ribbon call is wrapped within Hystrix to ensure that the call remains fault tolerant. The first timeout that we are hitting is because Hystrix has a very low delay tolerance threshold and tweaking the hystrix settings should get us past the first timeout.

hystrix:
  command:
    sample-svc:
      execution:
        isolation:
          thread:
            timeoutInMilliseconds: 15000

Note that the Hystrix "Command Key" used for configuration is the name of the service as registered in Eureka.

This may be a little too fine grained for this specific Zuul call, if you are okay about tweaking it across the board then configuration along these lines should do the job:

hystrix:
  command:
    default:
      execution:
        isolation:
          thread:
            timeoutInMilliseconds: 15000

With this change the request to the service via the gateway with a delay of upto 5 seconds will now go through without any issues. If we were to go above 5 seconds though we would get another timeout. We are now hitting Ribbons timeout setting which again can be configured in a fine grained way for the specific service call by tweaking configuration which looks like this:

sample-svc:
  ribbon:
    ReadTimeout: 15000


With both these timeout tweaks in place the gateway based call should now go through

Conclusion


The purpose was not to show ways of setting arbitrarily high timeout values but just to show how to set values that may be more appropriate for your applications. Sensible timeouts are very important to ensure that bad service behaviors don't cascade upto the users. One thing to note is that if the gateway is configured without Ribbon and Eureka by specifying a direct url to a service then these timeout settings are not relevant at all.

If you are interested in exploring this further, the samples are available here.

Tuesday, June 7, 2016

Spring-Reactive samples

Spring-Reactive aims to bring reactive programming support to Spring based projects and this is expected to be available for the timelines of Spring 5. My intention here is to exercise some of the very basic signatures for REST endpoints with this model.

Before I go ahead let me acknowledge that this entire sample is completely based on the samples which Sébastien Deleuze has put together here - https://github.com/sdeleuze/spring-reactive-playground

I wanted to consider three examples, first a case where existing Java 8 CompletableFuture is returned as a type, second where RxJava's Observable is returned as a type and third with Spring Reactor Core's Flux type.

Expected Protocol

The structure of the request and response message handled by each of the three service is along these lines, all of them will take in a request which looks like this:

{
 "id":1,
  "delay_by": 2000,
  "payload": "Hello",
  "throw_exception": false
}


The delay_by will make the response to be delayed and throw_exception will make the response to error out. A sane response will be the following:

{
  "id": "1",
  "received": "Hello",
  "payload": "Response Message"
}

I will be ignoring the exceptions for this post.

CompletableFuture as a return type


Consider a service which returns a java 8 CompletableFuture as a return type:

public CompletableFuture<MessageAcknowledgement> handleMessage(Message message) {
 return CompletableFuture.supplyAsync(() -> {
  Util.delay(message.getDelayBy());
  return new MessageAcknowledgement(message.getId(), message.getPayload(), "data from CompletableFutureService");
 }, futureExecutor);
}

The method signature of a Controller which calls this service looks like this now:

@RestController
public class CompletableFutureController {

 private final CompletableFutureService aService;

 @Autowired
 public CompletableFutureController(CompletableFutureService aService) {
  this.aService = aService;
 }

 @RequestMapping(path = "/handleMessageFuture", method = RequestMethod.POST)
 public CompletableFuture<MessageAcknowledgement> handleMessage(@RequestBody Message message) {
  return this.aService.handleMessage(message);
 }

}

When the CompletableFuture completes the framework will ensure that the response is marshalled back appropriately.

Rx Java Observable as a return type

Consider a service which returns a Rx Java Observable as a return type:

public Observable<MessageAcknowledgement> handleMessage(Message message) {
 logger.info("About to Acknowledge");
 return Observable.just(message)
   .delay(message.getDelayBy(), TimeUnit.MILLISECONDS)
   .flatMap(msg -> {
    if (msg.isThrowException()) {
     return Observable.error(new IllegalStateException("Throwing a deliberate exception!"));
    }
    return Observable.just(new MessageAcknowledgement(message.getId(), message.getPayload(), "From RxJavaService"));
   });
}

The controller invoking such a service can directly return the Observable as a type now and the framework will ensure that once all the items have been emitted the response is marshalled correctly.
@RestController
public class RxJavaController {

 private final RxJavaService aService;

 @Autowired
 public RxJavaController(RxJavaService aService) {
  this.aService = aService;
 }

 @RequestMapping(path = "/handleMessageRxJava", method = RequestMethod.POST)
 public Observable<MessageAcknowledgement> handleMessage(@RequestBody Message message) {
  System.out.println("Got Message..");
  return this.aService.handleMessage(message);
 }

}

Note that since Observable represents a stream of 0 to many items, this time around the response is a json array.


Spring Reactor Core Flux as a return type


Finally, if the response type is a Flux type, the framework ensures that the response is handled cleanly. The service is along these lines:

public Flux<messageacknowledgement> handleMessage(Message message) {
 return Flux.just(message)
   .delay(Duration.ofMillis(message.getDelayBy()))
   .map(msg -> Tuple.of(msg, msg.isThrowException()))
   .flatMap(tup -> {
    if (tup.getT2()) {
     return Flux.error(new IllegalStateException("Throwing a deliberate Exception!"));
    }
    Message msg = tup.getT1();
    return Flux.just(new MessageAcknowledgement(msg.getId(), msg.getPayload(), "Response from ReactorService"));
   });
}

and a controller making use of such a service:

@RestController
public class ReactorController {

 private final ReactorService aService;

 @Autowired
 public ReactorController(ReactorService aService) {
  this.aService = aService;
 }

 @RequestMapping(path = "/handleMessageReactor", method = RequestMethod.POST)
 public Flux<MessageAcknowledgement> handleMessage(@RequestBody Message message) {
  return this.aService.handleMessage(message);
 }

}

Conclusion

This is just a sampling of the kind of return types that the Spring Reactive project supports, the possible return types is way more than this - here is a far more comprehensive example.

I look forward to when the reactive programming model becomes available in the core Spring framework.

The samples presented in this blog post is available at my github repository

Saturday, May 28, 2016

Cloud Foundry Java Client - Streaming events

Cloud Foundry Java Client provides Java based bindings for interacting with a running Cloud Foundry instance. One of the neat things about this project is that it has embraced the Reactive Stream based API's for its method signatures, specifically using the Reactor implementation, this is especially useful when consuming streaming data.

In this post I want to demonstrate a specific use case where this library really shines - in streaming events from Cloud Foundry

Loggregator is the subsystem in Cloud Foundry responsible for aggregating all the logs produced within the system and provides ways for this information to be streamed out to external systems. The "Traffic Controller" component within Loggregator exposes a Websocket based endpoint streaming out these events, the Cloud Foundry Java client abstracts the underlying websocket client connection details and provides a neat way to consume this information.


As a pre-requisite, you will need a running instance of Cloud Foundry to try out the sample and the best way to get it working locally is to use PCF Dev.

Assuming that you have a running instance, the way to connect to this instance from code using the cf-java-client library is along the following lines:

SpringCloudFoundryClient cfClient = SpringCloudFoundryClient.builder()
                .host("api.local.pcfdev.io")
                .username("admin")
                .password("admin")
                .skipSslValidation(true)
                .build();

Using this, a client to the Traffic Controller can be created the following way:

DopplerClient dopplerClient = ReactorDopplerClient.builder()
      .cloudFoundryClient(cfClient)
      .build();


That is essentially it, doppler client provides methods to stream the underlying events, if you are interested in all the unfiltered information(appropriately referred to as the firehose), you can do it the following way:

Flux<Event> cfEvents = this.dopplerClient.firehose(
    FirehoseRequest.builder()
      .subscriptionId(UUID.randomUUID().toString()).build());

The result is a Flux type from the Reactor library encapsulating the streaming data which can be observed by attaching a subscriber, say for a basic example of a subscriber simply logging the events to the console the following way:

cfEvents.subscribe(e -> LOGGER.info(e.toString()));


However the real power of Flux is in the very powerful fluent methods that it provides, so for eg if I were interested in a subset of say just the Application level logs, I would essentially want to filter down the data, extract the log from it and print the log the following way:

cfEvents
 .filter(e -> LogMessage.class.isInstance(e))
 .map(e -> (LogMessage)e)
 .map(LogMessage::getMessage)
 .subscribe(LOGGER::info);

If you want to play with this sample which as an added bonus has been Spring Boot enabled, I have it available in my github repository.

Tuesday, May 17, 2016

Spring Cloud with Turbine AMQP

I have previously blogged about using Spring Cloud with Turbine, a Netflix OSS library which provides a way to aggregate the information from Hystrix streams across a cluster.

The default aggregation flow is however pull-based, where Turbine requests the hystrix stream from each instance in the cluster and aggregates it together - this tends to be way more configuration heavy.


Spring Cloud Turbine AMQP offers a different model, where each application instance pushes the metrics from Hystrix commands to Turbine through a central RabbitMQ broker.


This blog post recreates the sample that I had configured previously using Spring Cloud support for AMQP - the entire sample is available at my github repo if you just want the code.

The changes are very minor for such a powerful feature, all the application which wants to feed the hystrix stream to an AMQP broker is to add these dependencies expressed in maven the following way:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-hystrix</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-netflix-hystrix-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>


These dependencies would now auto-configure all the connectivity details with RabbitMQ sample topic exchange and would start feeding in the hystrix stream data into this RabbitMQ topic.

Similarly on the Turbine end all that needs to be done is to specify the appropriate dependencies:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-turbine-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

This would consume the hystrix messages from RabbitMQ and would in turn expose an aggregated stream over an http endpoint.

Using this aggregated stream a hystrix dashboard can be displayed along these lines:


The best way to try out the sample is using docker-compose and the README with the sample explains how to build the relevant docker containers and start it up using docker-compose.

Monday, May 2, 2016

Approaches to binding a Spring Boot application to a service in Cloud Foundry

If you want to try out Cloud Foundry the simplest way to do that is to download the excellent PCF Dev or to create a trial account at the Pivotal Web Services site.

The rest of the post assumes that you have an installation of Cloud Foundry available to you and that you have a high level understanding of Cloud Foundry. The objective of this post is to list out of the options you have in integrating your Java application to a service instance - this demo uses mysql as a sample service to integrate with but the approach is generic enough.

Overview of the Application

The application is fairly simple Spring-Boot app, it is a REST service exposing three domain types and their relationships, representing a university - Course, Teacher and Student. The domain instances are persisted to a MySQL database. The entire source code and the approaches are available at this github location if you want to jump ahead.

To try the application locally, first install a local mysql server database, on a Mac OSX box with homebrew available, the following set of commands can be run:

brew install mysql

mysql.server start
mysql -u root
# on the mysql prompt: 

CREATE USER 'univadmin'@'localhost' IDENTIFIED BY 'univadmin';
CREATE DATABASE univdb;
GRANT ALL ON univdb.* TO 'univadmin'@'localhost';

Bring up the Spring-Boot under cf-db-services-sample-auto:

mvn spring-boot:run

and an endpoint with a sample data will be available at http://localhost:8080/courses.

Trying this application on Cloud Foundry

If you have an installation of PCF Dev running locally, you can try out a deployment of the application the following way:

cf api api.local.pcfdev.io --skip-ssl-validation
cf login # login with admin/admin credentials

Create a Mysql service instance:
cf create-service p-mysql 512mb mydb

and push the app! (manifest.yml provides the binding of the app to the service instance)
cf push

An endpoint should be available at http://cf-db-services-sample-auto.local.pcfdev.io/courses

Approaches to service connectivity


Now that we have an application that works locally and on a sample local Cloud Foundry, these are the approaches to connecting to a service instance.

Approach 1 - Do nothing, let the Java buildpack handle the connectivity details


This approach is demonstrated in the cf-db-services-sample-auto project. Here the connectivity to the local database has been specified using Spring Boot and looks like this:

---

spring:
  jpa:
    show-sql: true
    hibernate.ddl-auto: none
    database: MYSQL

  datasource:
    driverClassName: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost/univdb?autoReconnect=true&useSSL=false
    username: univadmin
    password: univadmin

When this application is pushed to Cloud Foundry using the Java Buildpack, a component called the java-buildpack-auto-reconfiguration is injected into the application which reconfigures the connectivity to the service based on the runtime service binding.


Approach 2 - Disable Auto reconfiguration and use runtime properties

This approach is demonstrated in the cf-db-services-sample-props project. When a service is bound to an application, there is a set of environment properties injected into the application under the key "VCAP_SERVICES". For this specific service the entry looks something along these lines:

"VCAP_SERVICES": {
  "p-mysql": [
   {
    "credentials": {
     "hostname": "mysql.local.pcfdev.io",
     "jdbcUrl": "jdbc:mysql://mysql.local.pcfdev.io:3306/cf_456d9e1e_e31e_43bc_8e94_f8793dffdad5?user=**\u0026password=***",
     "name": "cf_456d9e1e_e31e_43bc_8e94_f8793dffdad5",
     "password": "***",
     "port": 3306,
     "uri": "mysql://***:***@mysql.local.pcfdev.io:3306/cf_456d9e1e_e31e_43bc_8e94_f8793dffdad5?reconnect=true",
     "username": "***"
    },
    "label": "p-mysql",
    "name": "mydb",
    "plan": "512mb",
    "provider": null,
    "syslog_drain_url": null,
    "tags": [
     "mysql"
    ]
   }
  ]
 }

The raw json is a little unwieldy to consume, however Spring Boot automatically converts this data into a flat set of properties that looks like this:

"vcap.services.mydb.plan": "512mb",
"vcap.services.mydb.credentials.username": "******",
"vcap.services.mydb.credentials.port": "******",
"vcap.services.mydb.credentials.jdbcUrl": "******",
"vcap.services.mydb.credentials.hostname": "******",
"vcap.services.mydb.tags[0]": "mysql",
"vcap.services.mydb.credentials.uri": "******",
"vcap.services.mydb.tags": "mysql",
"vcap.services.mydb.credentials.name": "******",
"vcap.services.mydb.label": "p-mysql",
"vcap.services.mydb.syslog_drain_url": "",
"vcap.services.mydb.provider": "",
"vcap.services.mydb.credentials.password": "******",
"vcap.services.mydb.name": "mydb",

Given this, the connectivity to the database can be specified in a Spring Boot application the following way - in a application.yml file:

spring:
  datasource:
    url: ${vcap.services.mydb.credentials.jdbcUrl}
    username: ${vcap.services.mydb.credentials.username}
    password: ${vcap.services.mydb.credentials.password}

One small catch though is that since I am now explicitly taking control of specifying the service connectivity, the runtime java-buildpack-auto-reconfiguration has to be disabled, which can done by a manifest metadata:
---
applications:
  - name: cf-db-services-sample-props
    path: target/cf-db-services-sample-props-1.0.0.RELEASE.jar
    memory: 512M
    env:
      JAVA_OPTS: -Djava.security.egd=file:/dev/./urandom
      SPRING_PROFILES_ACTIVE: cloud
    services:
      - mydb

buildpack: https://github.com/cloudfoundry/java-buildpack.git

env:
    JBP_CONFIG_SPRING_AUTO_RECONFIGURATION: '{enabled: false}'

Approach 3 - Using Spring Cloud Connectors

The third approach is to use the excellent Spring Cloud Connectors project and a configuration which specifies a service connectivity looks like this and is demonstrated in the cf-db-services-sample-connector sub-project:

@Configuration
@Profile("cloud")
public  class CloudFoundryDatabaseConfig {

    @Bean
    public Cloud cloud() {
        return new CloudFactory().getCloud();
    }

    @Bean
    public DataSource dataSource() {
        DataSource dataSource = cloud().getServiceConnector("mydb", DataSource.class, null);
        return dataSource;
    }
}

Pros and Cons



These are the Pros and Cons with each of these approaches:

Approaches Pros Cons
Approach 1 - Let Buildpack handle it
1. Simple, the application that works locally will work without any changes on the cloud

1. Magical - the auto-reconfiguration may appear magical to someone who does not understand the underlying flow
2. The number of service types supported is fairly limited -
say for eg, if a connectivity is required to Cassandra then Auto-reconfiguration will not work
Approach 2 - Explicit Properties 1. Fairly straightforward.
2. Follows the Spring Boot approach and uses some of the best practices of Boot based applications - for eg, there is a certain order in which datasource connection pools are created, all those best practices just flow in using this approach.
1. The Auto-reconfiguration will have to be explicitly disabled
2. Need to know what the flattened properties look like
3. A "cloud" profile may have to be manually injected through environment properties to differentiate local development and cloud deployment
4. Difficult to encapsulate reusability of connectivity to newer service types - say Cassandra or DynamoDB.
Approach 3 - Spring Cloud Connectors 1. Simple to integrate
2. Easy to add in re-usable integration to newer service types
1. Bypasses the optimizations of Spring Boot connection pool logic.

Conclusion


My personal preference is to go with Approach 2 as it most closely matches the Spring Boot defaults, not withstanding the cons of the approach. If more complicated connectivity to a service is required I will likely go with approach 3. Your mileage may vary though


References

1. Scott Frederick's spring-music has been a constant guide.
2. I have generously borrowed from Ben Hale's pong_matcher_spring sample.