Saturday, December 27, 2014

Spring retry - ways to integrate with your project

If you have a need to implement robust retry logic in your code, a proven way would be to use the spring retry library. My objective here is not to show how to use the spring retry project itself, but in demonstrating different ways that it can be integrated into your codebase.

Consider a service to invoke an external system:

package retry.service;

public interface RemoteCallService {
    String call() throws Exception;
}


Assume that this call can fail and you want the call to be retried thrice with a 2 second delay each time the call fails, so to simulate this behavior I have defined a mock service using Mockito this way, note that this is being returned as a mocked Spring bean:

@Bean
public RemoteCallService remoteCallService() throws Exception {
    RemoteCallService remoteService = mock(RemoteCallService.class);
    when(remoteService.call())
            .thenThrow(new RuntimeException("Remote Exception 1"))
            .thenThrow(new RuntimeException("Remote Exception 2"))
            .thenReturn("Completed");
    return remoteService;
}
So essentially this mocked service fails 2 times and succeeds with the third call.

And this is the test for the retry logic:

public class SpringRetryTests {

    @Autowired
    private RemoteCallService remoteCallService;

    @Test
    public void testRetry() throws Exception {
        String message = this.remoteCallService.call();
        verify(remoteCallService, times(3)).call();
        assertThat(message, is("Completed"));
    }
}

We are ensuring that the service is called 3 times to account for the first two failed calls and the third call which succeeds.

If we were to directly incorporate spring-retry at the point of calling this service, then the code would have looked like this:
@Test
public void testRetry() throws Exception {
    String message = this.retryTemplate.execute(context -> this.remoteCallService.call());
    verify(remoteCallService, times(3)).call();
    assertThat(message, is("Completed"));
}

This is not ideal however, a better way would be where the callers don't have have to be explicitly aware of the fact that there is a retry logic in place.

Given this, the following are the approaches to incorporate Spring-retry logic.

Approach 1: Custom Aspect to incorporate Spring-retry

This approach should be fairly intuitive as the retry logic can be considered a cross cutting concern and a good way to implement a cross cutting concern is using Aspects. An aspect which incorporates the Spring-retry would look something along these lines:

package retry.aspect;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.retry.support.RetryTemplate;

@Aspect
public class RetryAspect {

    private static Logger logger = LoggerFactory.getLogger(RetryAspect.class);

    @Autowired
    private RetryTemplate retryTemplate;

    @Pointcut("execution(* retry.service..*(..))")
    public void serviceMethods() {
        //
    }

    @Around("serviceMethods()")
    public Object aroundServiceMethods(ProceedingJoinPoint joinPoint) {
        try {
            return retryTemplate.execute(retryContext -> joinPoint.proceed());
        } catch (Throwable e) {
            throw new RuntimeException(e);
        }
    }
}

This aspect intercepts the remote service call and delegates the call to the retryTemplate. A full working test is here.

Approach 2: Using Spring-retry provided advice

Out of the box Spring-retry project provides an advice which takes care of ensuring that targeted services can be retried. The AOP configuration to weave the advice around the service requires dealing with raw xml as opposed to the previous approach where the aspect can be woven using Spring Java configuration. The xml configuration looks like this:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xsi:schemaLocation="
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

 <aop:config>
  <aop:pointcut id="transactional"
       expression="execution(* retry.service..*(..))" />
  <aop:advisor pointcut-ref="transactional"
      advice-ref="retryAdvice" order="-1"/>
 </aop:config>

</beans>

The full working test is here.

Approach 3: Declarative retry logic

This is the recommended approach, you will see that the code is far more concise than with the previous two approaches. With this approach, the only thing that needs to be done is to declaratively indicate which methods need to be retried:

package retry.service;

import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;

public interface RemoteCallService {
    @Retryable(maxAttempts = 3, backoff = @Backoff(delay = 2000))
    String call() throws Exception;
}

and a full test which makes use of this declarative retry logic, also available here:

package retry;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import retry.service.RemoteCallService;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.*;


@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class SpringRetryDeclarativeTests {

    @Autowired
    private RemoteCallService remoteCallService;

    @Test
    public void testRetry() throws Exception {
        String message = this.remoteCallService.call();
        verify(remoteCallService, times(3)).call();
        assertThat(message, is("Completed"));
    }

    @Configuration
    @EnableRetry
    public static class SpringConfig {

        @Bean
        public RemoteCallService remoteCallService() throws Exception {
            RemoteCallService remoteService = mock(RemoteCallService.class);
            when(remoteService.call())
                    .thenThrow(new RuntimeException("Remote Exception 1"))
                    .thenThrow(new RuntimeException("Remote Exception 2"))
                    .thenReturn("Completed");
            return remoteService;
        }
    }
}

The @EnableRetry annotation activates the processing of @Retryable annotated methods and internally uses logic along the lines of approach 2 without the end user needing to be explicit about it.

I hope this gives you a slightly better taste for how to incorporate Spring-retry in your project. All the code that I have demonstrated here is also available in my github project here: https://github.com/bijukunjummen/test-spring-retry

Tuesday, December 23, 2014

Solving "Water buckets" problem using Scala

I recently came across a puzzle called the "Water Buckets" problem in this book, which totally stumped me.


You have a 12-gallon bucket, an 8-gallon bucket and a 5-gallon bucket. The 12-gallon bucket is full of water and the other two are empty. Without using any additional water how can you divide the twelve gallons of water equally so that two of the three buckets have exactly 6 gallons of water in them?


I and my nephew spent a good deal of time trying to solve it and ultimately gave up.

I remembered then that I have seen a programmatic solution to a similar puzzle being worked out in the "Functional Programming Principles in Scala" Coursera course by Martin Odersky.

This is the gist to the solution completely copied from the course:



and running this program spits out the following 7 step solution! (index 0 is the 12-gallon bucket, 1 is the 8-gallon bucket and 2 is the 5-gallon bucket)

Pour(0,1) 
Pour(1,2) 
Pour(2,0) 
Pour(1,2) 
Pour(0,1) 
Pour(1,2) 
Pour(2,0)

If you are interested in learning more about the code behind this solution, the best way is to follow the week 7 of the Coursera course that I have linked above, Martin Odersky does a fantastic job of seemingly coming up with a solution on the fly!.

Saturday, December 13, 2014

RabbitMQ - Processing messages serially using Spring integration Java DSL

If you ever have a need to process messages serially with RabbitMQ with a cluster of listeners processing the messages, the best way that I have seen is to use a "exclusive consumer" flag on a listener with 1 thread on each listener processing the messages.

Exclusive consumer flag ensures that only 1 consumer can read messages from the specific queue, and 1 thread on that consumer ensures that the messages are processed serially. There is a catch however, I will go over it later.

Let me demonstrate this behavior with a Spring Boot and Spring Integration based RabbitMQ message consumer.

First, this is the configuration for setting up a queue using Spring java configuration, note that since this is a Spring Boot application, it automatically creates a RabbitMQ connection factory when the Spring-amqp library is added to the list of dependencies:

@Configuration
@Configuration
public class RabbitConfig {

    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Bean
    public Queue sampleQueue() {
        return new Queue("sample.queue", true, false, false);
    }

}

Given this sample queue, a listener which gets the messages from this queue and processes them looks like this, the flow is written using the excellent Spring integration Java DSL library:

@Configuration
public class RabbitInboundFlow {
    private static final Logger logger = LoggerFactory.getLogger(RabbitInboundFlow.class);

    @Autowired
    private RabbitConfig rabbitConfig;

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(this.connectionFactory);
        listenerContainer.setQueues(this.rabbitConfig.sampleQueue());
        listenerContainer.setConcurrentConsumers(1);
        listenerContainer.setExclusive(true);
        return listenerContainer;
    }

    @Bean
    public IntegrationFlow inboundFlow() {
        return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer()))
                .transform(Transformers.objectToString())
                .handle((m) -> {
                    logger.info("Processed  {}", m.getPayload());
                })
                .get();
    }

}


The flow is very concisely expressed in the inboundFlow method, a message payload from RabbitMQ is transformed from byte array to String and finally processed by simply logging the message to the logs

The important part of the flow is the listener configuration, note the flag which sets the consumer to be an exclusive consumer and within this consumer the number of threads processing is set to 1. Given this even if multiple instances of the application is started up only 1 of the listeners will be able to connect and process messages.


Now for the catch, consider a case where the processing of messages takes a while to complete and rolls back during processing of the message. If the instance of the application handling the message were to be stopped in the middle of processing such a message, then the behavior is a different instance will start handling the messages in the queue, when the stopped instance rolls back the message, the rolled back message is then delivered to the new exclusive consumer, thus getting a message out of order.

If you are interested in exploring this further, here is a github project to play with this feature: https://github.com/bijukunjummen/test-rabbit-exclusive