Saturday, December 17, 2011

Concurrency - Sequential and Raw Thread

I worked on a project a while back, where the report flow was along these lines:



  1. User would request for a report
  2. The report request would be translated into smaller parts/sections
  3. The report for each part, based on the type of the part/section would be generated by a report generator
  4. The constituent report parts would be reassembled into a final report and given back to the user

My objective is to show how I progressed from a bad implementation to a fairly good implementation:

Some of the basic building blocks that I have is best demonstrated by a unit test:
This is a test helper which generates a sample report request, with constituent report request parts:
public class FixtureGenerator {
    public static ReportRequest generateReportRequest(){
        List<ReportRequestPart> requestParts = new ArrayList<ReportRequestPart>();
        Map<String, String> attributes = new HashMap<String, String>();
        attributes.put("user","user");
        Context context = new Context(attributes );
    
        ReportRequestPart part1 = new ReportRequestPart(Section.HEADER, context);
        ReportRequestPart part2 = new ReportRequestPart(Section.SECTION1, context);
        ReportRequestPart part3 = new ReportRequestPart(Section.SECTION2, context);
        ReportRequestPart part4 = new ReportRequestPart(Section.SECTION3, context);
        ReportRequestPart part5 = new ReportRequestPart(Section.FOOTER, context);   
        
        requestParts.add(part1);        
        requestParts.add(part2);
        requestParts.add(part3);
        requestParts.add(part4);
        requestParts.add(part5);
        
        ReportRequest reportRequest  = new ReportRequest(requestParts );
        return reportRequest;
    }

}
And the test for the report generation:
public class FixtureGenerator {
 @Test
 public void testSequentialReportGeneratorTime(){
  long startTime = System.currentTimeMillis();
  Report report = this.reportGenerator.generateReport(FixtureGenerator.generateReportRequest());
  long timeForReport = System.currentTimeMillis()-startTime;
  assertThat(report.getSectionReports().size(), is (5));
  logger.error(String.format("Sequential Report Generator : %s ms", timeForReport));
 } 

The component which generates a part of the report is a dummy implementation with a 2 second delay to simulate a IO intensive call:
public class DummyReportPartGenerator implements ReportPartGenerator{

 @Override
 public ReportPart generateReportPart(ReportRequestPart reportRequestPart) {
  try {
   //Deliberately introduce a delay
   Thread.sleep(2000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  return new ReportPart(reportRequestPart.getSection(), "Report for " + reportRequestPart.getSection());
 }
}

Sequential Implementation
Given these base set of classes, my first naive sequential implementation is the following:
public class SequentialReportGenerator implements ReportGenerator {
 private ReportPartGenerator reportPartGenerator;

 @Override
 public Report generateReport(ReportRequest reportRequest){
  List<ReportRequestPart> reportRequestParts = reportRequest.getRequestParts();
  List<ReportPart> reportSections = new ArrayList<ReportPart>();
  for (ReportRequestPart reportRequestPart: reportRequestParts){
   reportSections.add(reportPartGenerator.generateReportPart(reportRequestPart));
  }
  return new Report(reportSections);
 }
 
 
......
}

Obviously, for a report request with 5 parts in it, each part taking 2 seconds to be fulfilled this report takes about 10 seconds for it to be returned back to the user.

It begs to be made concurrent.

Raw Thread Based Implementation
The first concurrent implementation, not good but better than sequential is the following, where a thread is spawned for every report request part, waiting on the reportparts to be generated(using thread.join() method), and aggregating the pieces as they come in.

public class RawThreadBasedReportGenerator implements ReportGenerator {
    private static final Logger logger = LoggerFactory.getLogger(RawThreadBasedReportGenerator.class);

    private ReportPartGenerator reportPartGenerator;

    @Override
    public Report generateReport(ReportRequest reportRequest) {
        List<ReportRequestPart> reportRequestParts = reportRequest.getRequestParts();
        List<Thread> threads = new ArrayList<Thread>();
        List<ReportPartRequestRunnable> runnablesList = new ArrayList<ReportPartRequestRunnable>();
        for (ReportRequestPart reportRequestPart : reportRequestParts) {
            ReportPartRequestRunnable reportPartRequestRunnable = new ReportPartRequestRunnable(reportRequestPart, reportPartGenerator);
            runnablesList.add(reportPartRequestRunnable);
            Thread thread = new Thread(reportPartRequestRunnable);
            threads.add(thread);
            thread.start();
        }

        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        }

        List<ReportPart> reportParts = new ArrayList<ReportPart>();

        for (ReportPartRequestRunnable reportPartRequestRunnable : runnablesList) {
            reportParts.add(reportPartRequestRunnable.getReportPart());
        }

        return new Report(reportParts);

    }    
    .....
}

The danger with this approach is that a new thread is being created for every report part, so in a real world scenario if a 100 simultaneous request comes in with each request spawning 5 threads, this can potentially end up creating 500 costly threads in the vm!!

So thread creation has to be constrained in some way. I will go through two more approaches where threads are controlled, in the next blog entry.

No comments:

Post a Comment