I learned a few new things after writing that blog entry, thanks to Artem Bilan and wanted to document those learnings here:
So, first my original sample, here I have the following flow(the one's in bold):
- Take in a message of this type - "hello from spring integ"
- Split it up into individual words(hello, from, spring, integ)
- Send each word to a ActiveMQ queue
- Pick up the word fragments from the queue and capitalize each word
- Place the response back into a response queue
- Pick up the message, re-sequence based on the original sequence of the words
- Aggregate back into a sentence("HELLO FROM SPRING INTEG") and
- Return the sentence back to the calling application.
EchoFlowOutbound.java:
@Bean public DirectChannel sequenceChannel() { return new DirectChannel(); } @Bean public DirectChannel requestChannel() { return new DirectChannel(); } @Bean public IntegrationFlow toOutboundQueueFlow() { return IntegrationFlows.from(requestChannel()) .split(s -> s.applySequence(true).get().getT2().setDelimiters("\\s")) .handle(jmsOutboundGateway()) .get(); } @Bean public IntegrationFlow flowOnReturnOfMessage() { return IntegrationFlows.from(sequenceChannel()) .resequence() .aggregate(aggregate -> aggregate.outputProcessor(g -> Joiner.on(" ").join(g.getMessages() .stream() .map(m -> (String) m.getPayload()).collect(toList()))) , null) .get(); } @Bean public JmsOutboundGateway jmsOutboundGateway() { JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway(); jmsOutboundGateway.setConnectionFactory(this.connectionFactory); jmsOutboundGateway.setRequestDestinationName("amq.outbound"); jmsOutboundGateway.setReplyChannel(sequenceChannel()); return jmsOutboundGateway; }
It turns out, based on Artem Bilan's feedback, that a few things can be optimized here.
First notice how I have explicitly defined two direct channels, "requestChannel" for starting the flow that takes in the string message and the "sequenceChannel" to handle the message once it returns back from the jms message queue, these can actually be totally removed and the flow made a little more concise this way:
@Bean public IntegrationFlow toOutboundQueueFlow() { return IntegrationFlows.from("requestChannel") .split(s -> s.applySequence(true).get().getT2().setDelimiters("\\s")) .handle(jmsOutboundGateway()) .resequence() .aggregate(aggregate -> aggregate.outputProcessor(g -> Joiner.on(" ").join(g.getMessages() .stream() .map(m -> (String) m.getPayload()).collect(toList()))) , null) .get(); } @Bean public JmsOutboundGateway jmsOutboundGateway() { JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway(); jmsOutboundGateway.setConnectionFactory(this.connectionFactory); jmsOutboundGateway.setRequestDestinationName("amq.outbound"); return jmsOutboundGateway; }
"requestChannel" is now being implicitly created just by declaring a name for it. The sequence channel is more interesting, quoting Artem Bilan -
do not specify outputChannel for AbstractReplyProducingMessageHandler and rely on DSL, what it means is that here jmsOutboundGateway is a AbstractReplyProducingMessageHandler and its reply channel is implicitly derived by the DSL. Further, two methods which were earlier handling the flows for sending out the message to the queue and then continuing once the message is back, is collapsed into one. And IMHO it does read a little better because of this change.
The second good change and the topic of this article is the introduction of the Jms namespace factories, when I had written the previous blog article, DSL had support for defining the AMQ inbound/outbound adapter/gateway, now there is support for Jms based inbound/adapter adapter/gateways also, this simplifies the flow even further, the flow now looks like this:
@Bean public IntegrationFlow toOutboundQueueFlow() { return IntegrationFlows.from("requestChannel") .split(s -> s.applySequence(true).get().getT2().setDelimiters("\\s")) .handle(Jms.outboundGateway(connectionFactory) .requestDestination("amq.outbound")) .resequence() .aggregate(aggregate -> aggregate.outputProcessor(g -> Joiner.on(" ").join(g.getMessages() .stream() .map(m -> (String) m.getPayload()).collect(toList()))) , null) .get(); }
The inbound Jms part of the flow also simplifies to the following:
@Bean public IntegrationFlow inboundFlow() { return IntegrationFlows.from(Jms.inboundGateway(connectionFactory) .destination("amq.outbound")) .transform((String s) -> s.toUpperCase()) .get(); }
Thus, to conclude, Spring Integration Java DSL is an exciting new way to concisely configure Spring Integration flows. It is already very impressive in how it simplifies the readability of flows, the introduction of the Jms namespace factories takes it even further for JMS based flows.
I have updated my sample application with the changes that I have listed in this article - https://github.com/bijukunjummen/rg-si