Here's an explanation and example of how this is accomplished. Let me know if there are parts that aren't clear.
Gist with source
Universal
Initialization:
Thread indexes are applied in a atomically incremented fashion. This is managed using an AtomicInteger
named nextIndex
. These indexes are assigned to threads through a ThreadLocal
instance which initializes itself by getting the next index from nextIndex
and incrementing it. This happens the first time each thread's index is retrieved the first time.
A ThreadLocal
is created to track the last sequence this thread created. It's initialized 0.
The sequential factory object reference is passed in and stored.
Two AtomicReferenceArray
instances are created of size n
. The tail object is assigned to each reference, having been initialized with the initial state provided by the Sequential
factory. n
is the maximum number of threads allowed. Each element in these arrays 'belongs' to corresponding thread index.
Apply method:
This is the method that does the interesting work. It does the following:
- Create a new node for this invocation: mine
- Set this new node in the announce array at the current thread's index
Then the sequencing loop begins. It will continue until the current invocation has been sequenced:
- find a node in the announce array using the sequence of the last node created by this thread. More on this later.
- if a node is found in step 2 it is not yet sequenced, continue with it, otherwise, just focus on the current invocation. This will only try to help one other node per invocation.
- Whatever node was selected in step 3, keep trying to sequence it after the last sequenced node (other threads may interfere.) Regardless of success, set the current threads head reference to the sequence returned by
decideNext()
The key to the nested loop described above is the decideNext()
method. To understand that, we need to look at the Node class.
Node class
This class specifies nodes in a doubly-linked list. There's not a lot of action in this class. Most of the methods are simple retrieval methods that should be fairly self-explanatory.
tail method
this returns a special node instance with a sequence of 0. It simply acts as a place-holder until an invocation replaces it.
Properties and initialization
seq
: the sequence number, initialized to -1 (meaning unsequenced)
invocation
: the value of the invocation of apply()
. Set upon construction.
next
: AtomicReference
for the forward link. once assigned, this will never be changed
previous
: AtomicReference
for the backward link assigned upon sequencing and cleared by truncate()
Decide Next
This method is only one in Node with non-trivial logic. In a nutshell, a node is offered as a candidate to be the next node in the linked list. The compareAndSet()
method will check if it's reference is null and if so, set the reference to the candidate. If the reference is already set, it does nothing. This operation is atomic so if two candidates are offered at the same moment, only one will be selected. This guarantees only one node will ever be selected as the next one. If the candidate node is selected, it's sequence is set to the next value, and it's previous link is set to this node.
Jumping Back to the Universal class apply method ...
Having called decideNext()
on the last sequenced node (when checked) with either our node or a node from the announce
array, there are two possible occurrences:
1. The node was successfully sequenced
2. Some other thread pre-empted this thread.
The next step is to check whether the node created for this invocation. This could happen because this thread successfully sequenced it or some other thread picked it up from the announce
array and sequenced it for us. If it has not been sequenced, the process is repeated. Otherwise the call finishes up by clearing the announce array for at this thread's index and returning the result value of the invocation. The announce array is cleared to guarantee there are no references to the node left around that would prevent the node from being garbage collected and therefore keep all nodes in the linked list from that point on alive on the heap.
Evaluate method
Now that the invocation's node has been successfully sequenced, the invocation needs to be evaluated. To do that, the first step is to ensure that the invocations preceding this one have been evaluated. If they haven't this thread will not wait but will do that work immediately.
EnsurePrior method
The ensurePrior()
method does this work by checking the previous node in the linked list. If it's state is not set, the previous node will be evaluated. Node that this is recursive. If the node prior to prior node has not been evaluated, it will call evaluate for that node and so on an so forth.
Now that the previous node is known to have a state, we can evaluate this node. The last node is retrieved and assigned to a local variable. If this reference is null, it means that some other thread has pre-empted this one and already evaluated this node; setting it's state. Otherwise, the prior node's state is passed to the Sequential
object's apply method along with this node's invocation. The state returned is set on the node and the truncate()
method is called, clearing the backward link from the node as it is no longer needed.
MoveForward method
The move forward method will attempt to move all head references to this node if they are not already pointing to something further along. This is to ensure that if a thread stops calling, it's head will not retain a reference to an node that is no longer needed. The compareAndSet()
method will make sure we only update the node if some other thread hasn't changed it since it was retrieved.
Announce array and helping
The key to making this approach wait-free as opposed to simply lock-free is that we can't assume that the thread scheduler will give each thread priority when it needs it. If each thread simply attempted to sequence it's own nodes, it's possible that a thread could be continually pre-empted under load. To account for this possibility each thread will first try to 'help' other threads that may be unable to get sequenced.
The basic idea is that as each thread successfully creates nodes, the sequences assigned are monotonically increasing. If a thread or threads are continually pre-empting another thread, the index the use to find unsequenced nodes in the announce
array will move forward. Even if every thread that is currently trying to sequence a given node is continually pre-empted by another thread, eventually all threads will be trying to sequence that node. To illustrate, we'll construct an example with three threads.
At the starting point, all three threads' head and announce elements are pointed at the tail
node. The lastSequence
for each thread is 0.
At this point, Thread 1 is executed with an invocation. It checks the announce array for it's last sequence (zero) which is the node it is currently scheduled to index. It sequences the node and it's lastSequence
is set to 1.
Thread 2 is now executed with an invocation, it checks the announce array at it's last sequence (zero) and sees that it doesn't need help and so attempts to sequence it's invocation. It succeeds and now it's lastSequence
is set to 2.
Thread 3 is now executed and it also sees that the node at announce[0]
is already sequenced and sequences it's own invocation. It's lastSequence
is now set to 3.
Now Thread 1 is invoked again. It checks the announce array at index 1 and finds that it's already sequenced. Concurrently, Thread 2 is invoked. It checks the announce array at index 2 and finds that it is already sequenced. Both Thread 1 and Thread 2 now attempt to sequence their own nodes. Thread 2 wins and it sequences it's invocation. It's lastSequence
is set to 4. Meanwhile, thread three has been invoked. It checks the index it lastSequence
(mod 3) and finds that the node at announce[0]
has not been sequenced. Thread 2 is again invoked at the same time that Thread 1 is on it's second attempt. Thread 1 finds an unsequenced invocation at announce[1]
which is the node just created by Thread 2. It attempts to sequence Thread 2's invocation and succeeds. Thread 2 finds it's own node at announce[1]
and it has been sequenced. It set's it's lastSequence
to 5. Thread 3 is then invoked and finds that node that thread 1 placed at announce[0]
is still not sequenced and attempts to do so. Meanwhile Thread 2 has also been invoked and pre-empts Thread 3. It sequences it's node and sets it's lastSequence
to 6.
Poor Thread 1. Even though Thread 3 is trying to sequence it, both threads have been continually thwarted by the scheduler. But at this point. Thread 2 is also now pointing to announce[0]
(6 mod 3). All three threads are set to attempt to sequence the same invocation. No matter which thread succeeds, the next node to be sequenced will be the waiting invocation of Thread 1 i.e. the node referenced by announce[0]
.
This is inevitable. In order for threads to be pre-empted, other threads must be sequencing nodes and as they do so, they will continually move their lastSequence
ahead. If a given thread's node is continually not sequenced, eventually all the threads will be pointing to it's index in the announce array. No thread will do anything else until the node it is trying to help has been sequenced, the worst case scenario is that all threads are pointing to the same unsequenced node. Therefore, the time required to sequence any invocation is a function of the number of threads and not the size of the input.
This is not a question of performance. It is first and foremost a question of correctness. If you have two lock statements, you can not guarantee atomicity for operations that are spread between them, or partially outside the lock statement. Tailored for the old version of your code, this means:
Between the end of the while (_runningWorkers >= MaxSimultaneousThreads)
and the _runningWorkers++
, anything at all may happen, because the code surrenders and re-acquires the lock in between. For example, thread A might acquire the lock for the first time, wait until there some other thread exits, and then break out of the loop and the lock
. It is then preempted, and thread B enters the picture, also waiting for room in the thread pool. Because said other thread quit, there is room so it doesn't wait very long at all. Both thread A and thread B now go on in some order, each incrementing _runningWorkers
and starting their work.
Now, there are no data races as far as I can see, but logically it's wrong, since there are now more than MaxSimultaneousThreads
workers running. The check is (occasionally) ineffective because the task of taking a slot in the thread pool is not atomic. This should concern you more than small optimizations around lock granularity! (Note that conversely, locking too early or for too long can easily lead to deadlocks.)
The second snippet fixes this problem, as far as I can see.
A less invasive change to fix the problem might be putting the ++_runningWorkers
right after the while
look, inside the first lock statement.
Now, correctness aside, what about performance? This is hard to tell. Generally locking for a longer time ("coarsely") inhibits concurrency, but as you say, this needs to be balanced against the overhead from the additional synchronization of fine-grained locking. Generally the only solution is benchmarking and being aware that there are more options than "lock everything everywhere" and "lock only the bare minimum". There is a wealth of patterns and concurrency primitives and thread-safe data structures available. For example, this seems like the very application semaphores were invented for, so consider using one of those instead of this hand-rolled hand-locked counter.
Best Answer
The operating system provides certain primitives for this kind of interprocess communication that don't require polling.
If process A is waiting on mutex M, the OS knows A can't be run and puts it aside in a bucket of processes waiting for something to happen. When the process holding M releases it, the OS looks at the list of processes waiting for it. The first process on the list, maybe A, gets removed from the idle bucket and put on the run queue. The next time A gets a time slice, the wait() it called will return and the program continues.