Java ExecutorService: awaitTermination of all recursively created tasks

concurrencyexecutorservicejavamultithreading

I use an ExecutorService to execute a task. This task can recursively create other tasks which are submitted to the same ExecutorService and those child tasks can do that, too.

I now have the problem that I want to wait until all the tasks are done (that is, all tasks are finished and they did not submit new ones) before I continue.

I cannot call ExecutorService.shutdown() in the main thread because this prevents new tasks from being accepted by the ExecutorService.

And Calling ExecutorService.awaitTermination() seems to do nothing if shutdown hasn't been called.

So I am kinda stuck here. It can't be that hard for the ExecutorService to see that all workers are idle, can it? The only inelegant solution I could come up with is to directly use a ThreadPoolExecutor and query its getPoolSize() every once in a while. Is there really no better way do do that?

Best Answer

This really is an ideal candidate for a Phaser. Java 7 is coming out with this new class. Its a flexible CountdonwLatch/CyclicBarrier. You can get a stable version at JSR 166 Interest Site.

The way it is a more flexible CountdownLatch/CyclicBarrier is because it is able to not only support an unknown number of parties (threads) but its also reusable (thats where the phase part comes in)

For each task you submit you would register, when that task is completed you arrive. This can be done recursively.

Phaser phaser = new Phaser();
ExecutorService e = //

Runnable recursiveRunnable = new Runnable(){
   public void run(){
      //do work recursively if you have to

      if(shouldBeRecursive){
           phaser.register();
           e.submit(recursiveRunnable);
      }

      phaser.arrive();
   }
}

public void doWork(){
   int phase = phaser.getPhase();

   phaser.register();
   e.submit(recursiveRunnable);

   phaser.awaitAdvance(phase);
}

Edit: Thanks @depthofreality for pointing out the race condition in my previous example. I am updating it so that executing thread only awaits advance of the current phase as it blocks for the recursive function to complete.

The phase number won't trip until the number of arrives == registers. Since prior to each recursive call invokes register a phase increment will happen when all invocations are complete.