I tried the DSL for a sample Integration flow that I have - I call it the Rube Goldberg flow, for it follows a convoluted path in trying to capitalize a string passed in as input. The flow looks like this and does some crazy things to perform a simple task:
- It takes in a message of this type - "hello from spring integ"
- splits it up into individual words(hello, from, spring, integ)
- sends each word to a ActiveMQ queue
- from the queue the word fragments are picked up by a enricher to capitalize each word
- placing the response back into a response queue
- It is picked up, resequenced based on the original sequence of the words
- aggregated back into a sentence("HELLO FROM SPRING INTEG") and
- returned back to the application.
To start with Spring Integration Java DSL, a simple Xml based configuration to capitalize a String would look like this:
1 2 3 4 5 | <channel id= "requestChannel" /> <gateway id= "echoGateway" service- interface = "rube.simple.EchoGateway" default -request-channel= "requestChannel" /> <transformer input-channel= "requestChannel" expression= "payload.toUpperCase()" /> |
There is nothing much going on here, a messaging gateway takes in the message passed in from the application, capitalizes it in a transformer and this is returned back to the application.
Expressing this in Spring Integration Java DSL:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | @Configuration @EnableIntegration @IntegrationComponentScan @ComponentScan public class EchoFlow { @Bean public IntegrationFlow simpleEchoFlow() { return IntegrationFlows.from( "requestChannel" ) .transform((String s) -> s.toUpperCase()) .get(); } } @MessagingGateway public interface EchoGateway { @Gateway (requestChannel = "requestChannel" ) String echo(String message); } |
Do note that @MessagingGateway annotation is not a part of Spring Integration Java DSL, it is an existing component in Spring Integration and serves the same purpose as the gateway component in XML based configuration. I like the fact that the transformation can be expressed using typesafe Java 8 lambda expressions rather than the Spring-EL expression. Note that the transformation expression could have coded in quite few alternate ways:
1 | ??.transform((String s) -> s.toUpperCase()) |
Or:
1 | ??.<String, String>transform(s -> s.toUpperCase()) |
Or using method references:
1 | ??.<String, String>transform(String::toUpperCase) |
Moving onto the more complicated Rube Goldberg flow to accomplish the same task, again starting with XML based configuration. There are two configurations to express this flow:
rube-1.xml: This configuration takes care of steps 1, 2, 3, 6, 7, 8 :
- It takes in a message of this type - "hello from spring integ"
- splits it up into individual words(hello, from, spring, integ)
- sends each word to a ActiveMQ queue
- from the queue the word fragments are picked up by a enricher to capitalize each word
- placing the response back into a response queue
- It is picked up, resequenced based on the original sequence of the words
- aggregated back into a sentence("HELLO FROM SPRING INTEG") and
- returned back to the application.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | <channel id= "requestChannel" /> <!--Step 1 , 8 --> <gateway id= "echoGateway" service- interface = "rube.complicated.EchoGateway" default -request-channel= "requestChannel" default -reply-timeout= "5000" /> <channel id= "toJmsOutbound" /> <!--Step 2 --> <splitter input-channel= "requestChannel" output-channel= "toJmsOutbound" expression= "payload.split('\s')" apply-sequence= "true" /> <channel id= "sequenceChannel" /> <!--Step 3 --> < int -jms:outbound-gateway request-channel= "toJmsOutbound" reply-channel= "sequenceChannel" request-destination= "amq.outbound" extract-request-payload= "true" /> <!--On the way back from the queue--> <channel id= "aggregateChannel" /> <!--Step 6 --> <resequencer input-channel= "sequenceChannel" output-channel= "aggregateChannel" release-partial-sequences= "false" /> <!--Step 7 --> <aggregator input-channel= "aggregateChannel" expression= "T(com.google.common.base.Joiner).on(' ').join(![payload])" /> |
and rube-2.xml for steps 4, 5:
- It takes in a message of this type - "hello from spring integ"
- splits it up into individual words(hello, from, spring, integ)
- sends each word to a ActiveMQ queue
- from the queue the word fragments are picked up by a enricher to capitalize each word
- placing the response back into a response queue
- It is picked up, resequenced based on the original sequence of the words
- aggregated back into a sentence("HELLO FROM SPRING INTEG") and
- returned back to the application.
1 2 3 4 5 | <channel id= "enhanceMessageChannel" /> < int -jms:inbound-gateway request-channel= "enhanceMessageChannel" request-destination= "amq.outbound" /> <transformer input-channel= "enhanceMessageChannel" expression= "(payload + '').toUpperCase()" /> |
Now, expressing this Rube Goldberg flow using Spring Integration Java DSL, the configuration looks like this, again in two parts:
EchoFlowOutbound.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | @Bean public DirectChannel sequenceChannel() { return new DirectChannel(); } @Bean public DirectChannel requestChannel() { return new DirectChannel(); } @Bean public IntegrationFlow toOutboundQueueFlow() { return IntegrationFlows.from(requestChannel()) .split(s -> s.applySequence( true ).get().getT2().setDelimiters( "\\s" )) .handle(jmsOutboundGateway()) .get(); } @Bean public IntegrationFlow flowOnReturnOfMessage() { return IntegrationFlows.from(sequenceChannel()) .resequence() .aggregate(aggregate -> aggregate.outputProcessor(g -> Joiner.on( " " ).join(g.getMessages() .stream() .map(m -> (String) m.getPayload()).collect(toList()))) , null ) .get(); } |
and EchoFlowInbound.java:
1 2 3 4 5 6 7 8 9 10 11 | @Bean public JmsMessageDrivenEndpoint jmsInbound() { return new JmsMessageDrivenEndpoint(listenerContainer(), messageListener()); } @Bean public IntegrationFlow inboundFlow() { return IntegrationFlows.from(enhanceMessageChannel()) .transform((String s) -> s.toUpperCase()) .get(); } |
Again here the code is completely typesafe and is checked for any errors at development time rather than at runtime as with the XML based configuration. Again I like the fact that transformation, aggregation statements can be expressed concisely using Java 8 lamda expressions as opposed to Spring-EL expressions.
What I have not displayed here is some of the support code, to set up the activemq test infrastructure, this configuration continues to remain as xml and I have included this code in a sample github project.
All in all, I am very excited to see this new way of expressing the Spring Integration messaging flow using pure Java and I am looking forward to seeing its continuing evolution and may be even try and participate in its evolution in small ways.
Here is the entire working code in a github repo: https://github.com/bijukunjummen/rg-si
References and Acknowledgement:
- Spring Integration Java DSL introduction blog article by Artem Bilan: https://spring.io/blog/2014/05/08/spring-integration-java-dsl-milestone-1-released
- Spring Integration Java DSL website and wiki: https://github.com/spring-projects/spring-integration-extensions/wiki/Spring-Integration-Java-DSL-Reference. A lot of code has been shamelessly copied over from this wiki by me :-). Also, a big thanks to Artem for guidance on a question that I had
- Webinar by Gary Russell on Spring Integration 4.0 in which Spring Integration Java DSL is covered in great detail.
No comments:
Post a Comment