Java – Executors: How to synchronously wait until all tasks have finished if tasks are created recursively

concurrencyjavathreadpoolexecutor

My question is strongly related to this one here.
As was posted there, I would like the main thread to wait until the work queue is empty and all tasks have finished. The problem in my situation is, however, that each task may recursively cause new tasks to be submitted for processing. This makes it a little awkward to collect all of those tasks's futures.

Our current solution uses a busy-wait loop to await termination:

        do { //Wait until we are done the processing
      try {
        Thread.sleep(200);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    } while (!executor.getQueue().isEmpty()
             || numTasks.longValue() > executor.getCompletedTaskCount());

numTasks is a value that is increased as every new task is created.
This works but I think it's not very nice due to the busy waiting. I was wondering whether there is a good way to make the main thread wait synchronously, until being explicitly woken up.

Best Answer

Thanks a lot for all your suggestions!

In the end I opted for something that I believe to be reasonably simple. I found out that CountDownLatch is almost what I need. It blocks until the counter reaches 0. The only problem is that it can only count down, not up, and thus does not work in the dynamic setting I have where tasks can submit new tasks. I hence implemented a new class CountLatch that offers additional functionality. (see below) This class I then use as follows.

Main thread calls latch.awaitZero(), blocking until latch reaches 0.

Any thread, before calling executor.execute(..) calls latch.increment().

Any task, just before completing, calls latch.decrement().

When the last task terminates, the counter will reach 0 and thus release the main thread.

Further suggestions and feedback are most welcome!

public class CountLatch {

@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer {

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected int acquireNonBlocking(int acquires) {
        // increment count
        for (;;) {
            int c = getState();
            int nextc = c + 1;
            if (compareAndSetState(c, nextc))
                return 1;
        }
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c - 1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;

public CountLatch(int count) {
    this.sync = new Sync(count);
}

public void awaitZero() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void increment() {
    sync.acquireNonBlocking(1);
}

public void decrement() {
    sync.releaseShared(1);
}

public String toString() {
    return super.toString() + "[Count = " + sync.getCount() + "]";
}

}

Note that the increment()/decrement() calls can be encapsulated into a customized Executor subclass as was suggested, for instance, by Sami Korhonen, or with beforeExecute and afterExecute as was suggested by impl. See here:

public class CountingThreadPoolExecutor extends ThreadPoolExecutor {

protected final CountLatch numRunningTasks = new CountLatch(0);

public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

@Override
public void execute(Runnable command) {
    numRunningTasks.increment();
    super.execute(command);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
    numRunningTasks.decrement();
    super.afterExecute(r, t);
}

/**
 * Awaits the completion of all spawned tasks.
 */
public void awaitCompletion() throws InterruptedException {
    numRunningTasks.awaitZero();
}

/**
 * Awaits the completion of all spawned tasks.
 */
public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
    numRunningTasks.awaitZero(timeout, unit);
}

}
Related Topic