Monday, December 19, 2011

Concurrency - Executors and Spring Integration

This is a follow up to a previous blog entry:

Thread Pool/Executors Based Implementation
A better approach than the raw thread version, is a Thread pool based one, where an appropriate thread pool size is defined based on the system where the task is running - Number of CPU's/(1-Blocking Coefficient of Task). Venkat Subramaniams book has more details:




First I defined a custom task to generate the Report Part, given the Report Part Request, this is implemented as a Callable:
public class ReportPartRequestCallable implements Callable<ReportPart> {
 private final ReportRequestPart reportRequestPart;
 private final ReportPartGenerator reportPartGenerator;

 public ReportPartRequestCallable(ReportRequestPart reportRequestPart, ReportPartGenerator reportPartGenerator) {
     this.reportRequestPart = reportRequestPart;
     this.reportPartGenerator = reportPartGenerator;
    }

 @Override
    public ReportPart call() {
    return this.reportPartGenerator.generateReportPart(reportRequestPart);
    } 
}

public class ExecutorsBasedReportGenerator implements ReportGenerator {
    private static final Logger logger = LoggerFactory.getLogger(ExecutorsBasedReportGenerator.class);

    private ReportPartGenerator reportPartGenerator;

    private ExecutorService executors = Executors.newFixedThreadPool(10);

    @Override
    public Report generateReport(ReportRequest reportRequest) {
        List<Callable<ReportPart>> tasks = new ArrayList<Callable<ReportPart>>();
        List<ReportRequestPart> reportRequestParts = reportRequest.getRequestParts();
        for (ReportRequestPart reportRequestPart : reportRequestParts) {
            tasks.add(new ReportPartRequestCallable(reportRequestPart, reportPartGenerator));
        }

        List<Future<ReportPart>> responseForReportPartList;
        List<ReportPart> reportParts = new ArrayList<ReportPart>();
        try {
            responseForReportPartList = executors.invokeAll(tasks);
            for (Future<ReportPart> reportPartFuture : responseForReportPartList) {
                reportParts.add(reportPartFuture.get());
            }

        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new RuntimeException(e);
        }
        return new Report(reportParts);
    }

 ......
}

Here a thread pool is created using the Executors.newFixedThreadPool(10) call, with a pool size of 10, a callable task is generated for each of the report request parts, and handed over to the threadpool using the ExecutorService abstraction -
responseForReportPartList = executors.invokeAll(tasks);
this call returns a List of Futures, which support a get() method which is a blocking call on the response to be available.

This is clearly a much better implementation compared to the raw thread version, the number of threads is constrained to a manageable number under load.


Spring Integration Based Implementation
The approach that I personally like the most is using Spring Integration, the reason is that with Spring Integration I focus on the components doing the different tasks and leave it upto Spring Integration to wire the flow together, using a xml based or annotation based configuration. Here I will be using a XML based configuration :

The components in my case are:
1. The component to generate the report part, given the report part request, which I had shown earlier.
2. A component to split the report request to report request parts:
public class DefaultReportRequestSplitter implements ReportRequestSplitter{
 @Override
 public List<ReportRequestPart> split(ReportRequest reportRequest) {
  return reportRequest.getRequestParts();
 }
}

3. A component to assemble/aggregate the report parts into a whole report:
public class DefaultReportAggregator implements ReportAggregator{

    @Override
    public Report aggregate(List<ReportPart> reportParts) {
        return new Report(reportParts);
    }

}

And that is all the java code that is required with Spring Integration, the rest of the is wiring - here I have used a Spring integration configuration file:
<?xml version="1.0" encoding="UTF-8"?>
<beans ....

    <int:channel id="report.partsChannel"/>
    <int:channel id="report.reportChannel"/>
    <int:channel id="report.partReportChannel">
        <int:queue capacity="50"/>
    </int:channel>  
    <int:channel id="report.joinPartsChannel"/>


 <int:splitter id="splitter" ref="reportsPartSplitter" method="split" 
        input-channel="report.partsChannel" output-channel="report.partReportChannel"/>
    
    <task:executor id="reportPartGeneratorExecutor" pool-size="10" queue-capacity="50" />
    
 <int:service-activator id="reportsPartServiceActivator"  ref="reportPartReportGenerator" method="generateReportPart" 
            input-channel="report.partReportChannel" output-channel="report.joinPartsChannel">
    <int:poller task-executor="reportPartGeneratorExecutor" fixed-delay="500">
    </int:poller>
 </int:service-activator>

    <int:aggregator ref="reportAggregator" method="aggregate" 
            input-channel="report.joinPartsChannel" output-channel="report.reportChannel" ></int:aggregator> 

    <int:gateway id="reportGeneratorGateway" service-interface="org.bk.sisample.springintegration.ReportGeneratorGateway" 
           default-request-channel="report.partsChannel" default-reply-channel="report.reportChannel"/>
    
    <bean name="reportsPartSplitter" class="org.bk.sisample.springintegration.processors.DefaultReportRequestSplitter"></bean>
    <bean name="reportPartReportGenerator" class="org.bk.sisample.processors.DummyReportPartGenerator"/>
    <bean name="reportAggregator" class="org.bk.sisample.springintegration.processors.DefaultReportAggregator"/>
    <bean name="reportGenerator" class="org.bk.sisample.springintegration.SpringIntegrationBasedReportGenerator"/>

</beans>

Spring Source Tool Suite provides a great way of visualizing this file:
this matches perfectly with my original view of the user flow:

In the Spring Integration version of the code, I have defined the different components to handle the different parts of the flow:
1. A splitter to convert a report request to report request parts:
<int:splitter id="splitter" ref="reportsPartSplitter" method="split" 
        input-channel="report.partsChannel" output-channel="report.partReportChannel"/>

2. A service activator component to generate a report part from a report part request:

<int:service-activator id="reportsPartServiceActivator"  ref="reportPartReportGenerator" method="generateReportPart" 
            input-channel="report.partReportChannel" output-channel="report.joinPartsChannel">
    <int:poller task-executor="reportPartGeneratorExecutor" fixed-delay="500">
    </int:poller>
 </int:service-activator>
3. An aggregator to join the report parts back to a report, and is intelligent enough to correlate the original split report requests appropriately without any explicit coding required for it:
<int:aggregator ref="reportAggregator" method="aggregate" 
            input-channel="report.joinPartsChannel" output-channel="report.reportChannel" ></int:aggregator> 


What is interesting in this code is that, like in the executors based sample, the number of threads that services each of these components is completely configurable using the xml file, by using appropriate channels to connect the different components together and by using task executors with the thread pool size set as attribute of the executor.

In this code, I have defined a queue channel where the report request parts come in:

<int:channel id="report.partReportChannel">
        <int:queue capacity="50"/>
    </int:channel>  


and is serviced by the service activator component, using a task executor with a thread pool of size 10, and a capacity of 50:

<task:executor id="reportPartGeneratorExecutor" pool-size="10" queue-capacity="50" />
    
 <int:service-activator id="reportsPartServiceActivator"  ref="reportPartReportGenerator" method="generateReportPart" 
            input-channel="report.partReportChannel" output-channel="report.joinPartsChannel">
    <int:poller task-executor="reportPartGeneratorExecutor" fixed-delay="500">
    </int:poller>
 </int:service-activator>


All this through configuration!


The entire codebase for this sample is available at this github location: https://github.com/bijukunjummen/si-sample

2 comments:

  1. dude you rock!! really good article. Surprised I'm the first commenter.

    ReplyDelete
  2. So I have a question regarding the Spring integration part related to error handling.

    Before Splitter, your payload type is RequestReport model and after splitter the payload type has changed to requestReportPart
    I have a same scenario where my splitter impl takes modelA and returns List and while error handling, If any exception is encountered, I have to put back the message into same queue for retry.

    Now if exception occurs after splitter, (because i lost the initial payload - modelA) and converted it into list. Is there a way by which in the error handler can I get the original message payload back?

    Extra info : the way i am handling error handling is there is one error channel on the inbound adapter.

    ReplyDelete