Saturday, May 31, 2014

Spring Integration Java DSL sample

A new Java based DSL has now been introduced for Spring Integration which makes it possible to define the Spring Integration message flows using pure java based configuration instead of using the Spring XML based configuration.

I tried the DSL for a sample Integration flow that I have - I call it the Rube Goldberg flow, for it follows a convoluted path in trying to capitalize a string passed in as input. The flow looks like this and does some crazy things to perform a simple task:




  1. It takes in a message of this type - "hello from spring integ"
  2. splits it up into individual words(hello, from, spring, integ)
  3. sends each word to a ActiveMQ queue
  4. from the queue the word fragments are picked up by a enricher to capitalize each word
  5. placing the response back into a response queue
  6. It is picked up, resequenced based on the original sequence of the words
  7. aggregated back into a sentence("HELLO FROM SPRING INTEG") and
  8. returned back to the application.

To start with Spring Integration Java DSL, a simple Xml based configuration to capitalize a String would look like this:

<channel id="requestChannel"/>

<gateway id="echoGateway" service-interface="rube.simple.EchoGateway" default-request-channel="requestChannel" />

<transformer input-channel="requestChannel" expression="payload.toUpperCase()" />  

There is nothing much going on here, a messaging gateway takes in the message passed in from the application, capitalizes it in a transformer and this is returned back to the application.

Expressing this in Spring Integration Java DSL:

@Configuration
@EnableIntegration
@IntegrationComponentScan
@ComponentScan
public class EchoFlow {

 @Bean
 public IntegrationFlow simpleEchoFlow() {
  return IntegrationFlows.from("requestChannel")
    .transform((String s) -> s.toUpperCase())
    .get();
 }
}

@MessagingGateway
public interface EchoGateway {
 @Gateway(requestChannel = "requestChannel")
 String echo(String message);
}

Do note that @MessagingGateway annotation is not a part of Spring Integration Java DSL, it is an existing component in Spring Integration and serves the same purpose as the gateway component in XML based configuration. I like the fact that the transformation can be expressed using typesafe Java 8 lambda expressions rather than the Spring-EL expression. Note that the transformation expression could have coded in quite few alternate ways:

??.transform((String s) -> s.toUpperCase())

Or:

??.<String, String>transform(s -> s.toUpperCase())

Or using method references:
??.<String, String>transform(String::toUpperCase)


Moving onto the more complicated Rube Goldberg flow to accomplish the same task, again starting with XML based configuration. There are two configurations to express this flow:

rube-1.xml: This configuration takes care of steps 1, 2, 3, 6, 7, 8 :
  1. It takes in a message of this type - "hello from spring integ"
  2. splits it up into individual words(hello, from, spring, integ)
  3. sends each word to a ActiveMQ queue
  4. from the queue the word fragments are picked up by a enricher to capitalize each word
  5. placing the response back into a response queue
  6. It is picked up, resequenced based on the original sequence of the words
  7. aggregated back into a sentence("HELLO FROM SPRING INTEG") and
  8. returned back to the application.

<channel id="requestChannel"/>

<!--Step 1, 8-->
<gateway id="echoGateway" service-interface="rube.complicated.EchoGateway" default-request-channel="requestChannel"
   default-reply-timeout="5000"/>

<channel id="toJmsOutbound"/>

<!--Step 2-->
<splitter input-channel="requestChannel" output-channel="toJmsOutbound" expression="payload.split('\s')"
    apply-sequence="true"/>

<channel id="sequenceChannel"/>

<!--Step 3-->
<int-jms:outbound-gateway request-channel="toJmsOutbound" reply-channel="sequenceChannel"
        request-destination="amq.outbound" extract-request-payload="true"/>


<!--On the way back from the queue-->
<channel id="aggregateChannel"/>

<!--Step 6-->
<resequencer input-channel="sequenceChannel" output-channel="aggregateChannel" release-partial-sequences="false"/>

<!--Step 7-->
<aggregator input-channel="aggregateChannel"
   expression="T(com.google.common.base.Joiner).on(' ').join(![payload])"/>

and rube-2.xml for steps 4, 5:
  1. It takes in a message of this type - "hello from spring integ"
  2. splits it up into individual words(hello, from, spring, integ)
  3. sends each word to a ActiveMQ queue
  4. from the queue the word fragments are picked up by a enricher to capitalize each word
  5. placing the response back into a response queue
  6. It is picked up, resequenced based on the original sequence of the words
  7. aggregated back into a sentence("HELLO FROM SPRING INTEG") and
  8. returned back to the application.

<channel id="enhanceMessageChannel"/>

<int-jms:inbound-gateway request-channel="enhanceMessageChannel" request-destination="amq.outbound"/>

<transformer input-channel="enhanceMessageChannel" expression="(payload + '').toUpperCase()"/>


Now, expressing this Rube Goldberg flow using Spring Integration Java DSL, the configuration looks like this, again in two parts:

EchoFlowOutbound.java:
@Bean
 public DirectChannel sequenceChannel() {
  return new DirectChannel();
 }

 @Bean
 public DirectChannel requestChannel() {
  return new DirectChannel();
 }

 @Bean
 public IntegrationFlow toOutboundQueueFlow() {
  return IntegrationFlows.from(requestChannel())
    .split(s -> s.applySequence(true).get().getT2().setDelimiters("\\s"))
    .handle(jmsOutboundGateway())
    .get();
 }

 @Bean
 public IntegrationFlow flowOnReturnOfMessage() {
  return IntegrationFlows.from(sequenceChannel())
    .resequence()
    .aggregate(aggregate ->
      aggregate.outputProcessor(g ->
        Joiner.on(" ").join(g.getMessages()
          .stream()
          .map(m -> (String) m.getPayload()).collect(toList())))
      , null)
    .get();
 }

and EchoFlowInbound.java:
@Bean
public JmsMessageDrivenEndpoint jmsInbound() {
 return new JmsMessageDrivenEndpoint(listenerContainer(), messageListener());
}

@Bean
public IntegrationFlow inboundFlow() {
 return IntegrationFlows.from(enhanceMessageChannel())
   .transform((String s) -> s.toUpperCase())
   .get();
}

Again here the code is completely typesafe and is checked for any errors at development time rather than at runtime as with the XML based configuration. Again I like the fact that transformation, aggregation statements can be expressed concisely using Java 8 lamda expressions as opposed to Spring-EL expressions.

What I have not displayed here is some of the support code, to set up the activemq test infrastructure, this configuration continues to remain as xml and I have included this code in a sample github project.

All in all, I am very excited to see this new way of expressing the Spring Integration messaging flow using pure Java and I am looking forward to seeing its continuing evolution and may be even try and participate in its evolution in small ways.


Here is the entire working code in a github repo: https://github.com/bijukunjummen/rg-si


References and Acknowledgement:
  • Spring Integration Java DSL introduction blog article by Artem Bilan: https://spring.io/blog/2014/05/08/spring-integration-java-dsl-milestone-1-released
  • Spring Integration Java DSL website and wiki: https://github.com/spring-projects/spring-integration-extensions/wiki/Spring-Integration-Java-DSL-Reference. A lot of code has been shamelessly copied over from this wiki by me :-). Also, a big thanks to Artem for guidance on a question that I had
  • Webinar by Gary Russell on Spring Integration 4.0 in which Spring Integration Java DSL is covered in great detail.



No comments:

Post a Comment