A "universal construction" is a wrapper class for a sequential object that enables it to be linearized (a strong consistency condition for concurrent objects). For instance, here's an adapted wait-free construction, in Java, from [1], which presumes the existence of a wait-free queue that satisfies the interface WFQ
(which only requires one-time consensus between threads) and assumes a Sequential
interface:
public interface WFQ<T> // "FIFO" iteration
{
int enqueue(T t); // returns the sequence number of t
Iterable<T> iterateUntil(int max); // iterates until sequence max
}
public interface Sequential
{
// Apply an invocation (method + arguments)
// and get a response (return value + state)
Response apply(Invocation i);
}
public interface Factory<T> { T generate(); } // generate new default object
public interface Universal extends Sequential {}
public class SlowUniversal implements Universal
{
Factory<? extends Sequential> generator;
WFQ<Invocation> wfq = new WFQ<Invocation>();
Universal(Factory<? extends Sequential> g) { generator = g; }
public Response apply(Invocation i)
{
int max = wfq.enqueue(i);
Sequential s = generator.generate();
for(Invocation invoc : wfq.iterateUntil(max))
s.apply(invoc);
return s.apply(i);
}
}
This implementation isn't very satisfying since it is really slow (you remember every invocation, and have to replay it at every apply – we have linear runtime in the history size). Is there any way that we could extend the WFQ
and Sequential
interfaces (in reasonable ways) to enable us to save some steps when applying a new invocation?
Can we make this more efficient (not linear runtime in history size, preferably memory usage goes down too) without losing the wait-free property?
Clarification
A "universal construction" is a term I'm pretty sure was made up by [1] which accepts a thread-unsafe but thread-compatible object, which is generalized by the Sequential
interface. Using a wait-free queue, the first construction offers a thread-safe, linearizable version of the object that's also wait-free (this assumes determinism and halting apply
operations).
This is inefficient, since the method is effectively to have each local thread start from a clean slate and applies every operation ever recorded to it. In any case, this works because it achieves synchronization effectively by using the WFQ
to determine the order in which all operations should be applied: every thread calling apply
will see the same local Sequential
object, with the same sequence of Invocation
s applied to it.
My question is whether we can (e.g.) introduce a background cleanup process which updates the "starting state" so that we don't have to restart from scratch. This isn't as simple as having an atomic pointer with a starting pointer – these kinds of approaches easily lose the wait-free guarantee. My suspicion is that some other queue-based approach might work here.
Jargon:
- wait-free – regardless of the number of threads or the scheduler's decision making,
apply
will terminate in a provably bounded number of instructions executed for that thread. - lock-free – same as above, but admits the possibility of an unbounded execution time, only in the case that an unbounded number of
apply
operations are getting done in other threads. Typically, optimistic synchronization schemes fall into this category. - blocking – efficiency at the mercy of the scheduler.
A working example, as requested (now on a page that won't expire)
[1] Herlihy and Shavit, The Art of Multiprocessor Programming.
Best Answer
Here's an explanation and example of how this is accomplished. Let me know if there are parts that aren't clear.
Gist with source
Universal
Initialization:
Thread indexes are applied in a atomically incremented fashion. This is managed using an
AtomicInteger
namednextIndex
. These indexes are assigned to threads through aThreadLocal
instance which initializes itself by getting the next index fromnextIndex
and incrementing it. This happens the first time each thread's index is retrieved the first time. AThreadLocal
is created to track the last sequence this thread created. It's initialized 0. The sequential factory object reference is passed in and stored. TwoAtomicReferenceArray
instances are created of sizen
. The tail object is assigned to each reference, having been initialized with the initial state provided by theSequential
factory.n
is the maximum number of threads allowed. Each element in these arrays 'belongs' to corresponding thread index.Apply method:
This is the method that does the interesting work. It does the following:
Then the sequencing loop begins. It will continue until the current invocation has been sequenced:
decideNext()
The key to the nested loop described above is the
decideNext()
method. To understand that, we need to look at the Node class.Node class
This class specifies nodes in a doubly-linked list. There's not a lot of action in this class. Most of the methods are simple retrieval methods that should be fairly self-explanatory.
tail method
this returns a special node instance with a sequence of 0. It simply acts as a place-holder until an invocation replaces it.
Properties and initialization
seq
: the sequence number, initialized to -1 (meaning unsequenced)invocation
: the value of the invocation ofapply()
. Set upon construction.next
:AtomicReference
for the forward link. once assigned, this will never be changedprevious
:AtomicReference
for the backward link assigned upon sequencing and cleared bytruncate()
Decide Next
This method is only one in Node with non-trivial logic. In a nutshell, a node is offered as a candidate to be the next node in the linked list. The
compareAndSet()
method will check if it's reference is null and if so, set the reference to the candidate. If the reference is already set, it does nothing. This operation is atomic so if two candidates are offered at the same moment, only one will be selected. This guarantees only one node will ever be selected as the next one. If the candidate node is selected, it's sequence is set to the next value, and it's previous link is set to this node.Jumping Back to the Universal class apply method ...
Having called
decideNext()
on the last sequenced node (when checked) with either our node or a node from theannounce
array, there are two possible occurrences: 1. The node was successfully sequenced 2. Some other thread pre-empted this thread.The next step is to check whether the node created for this invocation. This could happen because this thread successfully sequenced it or some other thread picked it up from the
announce
array and sequenced it for us. If it has not been sequenced, the process is repeated. Otherwise the call finishes up by clearing the announce array for at this thread's index and returning the result value of the invocation. The announce array is cleared to guarantee there are no references to the node left around that would prevent the node from being garbage collected and therefore keep all nodes in the linked list from that point on alive on the heap.Evaluate method
Now that the invocation's node has been successfully sequenced, the invocation needs to be evaluated. To do that, the first step is to ensure that the invocations preceding this one have been evaluated. If they haven't this thread will not wait but will do that work immediately.
EnsurePrior method
The
ensurePrior()
method does this work by checking the previous node in the linked list. If it's state is not set, the previous node will be evaluated. Node that this is recursive. If the node prior to prior node has not been evaluated, it will call evaluate for that node and so on an so forth.Now that the previous node is known to have a state, we can evaluate this node. The last node is retrieved and assigned to a local variable. If this reference is null, it means that some other thread has pre-empted this one and already evaluated this node; setting it's state. Otherwise, the prior node's state is passed to the
Sequential
object's apply method along with this node's invocation. The state returned is set on the node and thetruncate()
method is called, clearing the backward link from the node as it is no longer needed.MoveForward method
The move forward method will attempt to move all head references to this node if they are not already pointing to something further along. This is to ensure that if a thread stops calling, it's head will not retain a reference to an node that is no longer needed. The
compareAndSet()
method will make sure we only update the node if some other thread hasn't changed it since it was retrieved.Announce array and helping
The key to making this approach wait-free as opposed to simply lock-free is that we can't assume that the thread scheduler will give each thread priority when it needs it. If each thread simply attempted to sequence it's own nodes, it's possible that a thread could be continually pre-empted under load. To account for this possibility each thread will first try to 'help' other threads that may be unable to get sequenced.
The basic idea is that as each thread successfully creates nodes, the sequences assigned are monotonically increasing. If a thread or threads are continually pre-empting another thread, the index the use to find unsequenced nodes in the
announce
array will move forward. Even if every thread that is currently trying to sequence a given node is continually pre-empted by another thread, eventually all threads will be trying to sequence that node. To illustrate, we'll construct an example with three threads.At the starting point, all three threads' head and announce elements are pointed at the
tail
node. ThelastSequence
for each thread is 0.At this point, Thread 1 is executed with an invocation. It checks the announce array for it's last sequence (zero) which is the node it is currently scheduled to index. It sequences the node and it's
lastSequence
is set to 1.Thread 2 is now executed with an invocation, it checks the announce array at it's last sequence (zero) and sees that it doesn't need help and so attempts to sequence it's invocation. It succeeds and now it's
lastSequence
is set to 2.Thread 3 is now executed and it also sees that the node at
announce[0]
is already sequenced and sequences it's own invocation. It'slastSequence
is now set to 3.Now Thread 1 is invoked again. It checks the announce array at index 1 and finds that it's already sequenced. Concurrently, Thread 2 is invoked. It checks the announce array at index 2 and finds that it is already sequenced. Both Thread 1 and Thread 2 now attempt to sequence their own nodes. Thread 2 wins and it sequences it's invocation. It's
lastSequence
is set to 4. Meanwhile, thread three has been invoked. It checks the index itlastSequence
(mod 3) and finds that the node atannounce[0]
has not been sequenced. Thread 2 is again invoked at the same time that Thread 1 is on it's second attempt. Thread 1 finds an unsequenced invocation atannounce[1]
which is the node just created by Thread 2. It attempts to sequence Thread 2's invocation and succeeds. Thread 2 finds it's own node atannounce[1]
and it has been sequenced. It set's it'slastSequence
to 5. Thread 3 is then invoked and finds that node that thread 1 placed atannounce[0]
is still not sequenced and attempts to do so. Meanwhile Thread 2 has also been invoked and pre-empts Thread 3. It sequences it's node and sets it'slastSequence
to 6.Poor Thread 1. Even though Thread 3 is trying to sequence it, both threads have been continually thwarted by the scheduler. But at this point. Thread 2 is also now pointing to
announce[0]
(6 mod 3). All three threads are set to attempt to sequence the same invocation. No matter which thread succeeds, the next node to be sequenced will be the waiting invocation of Thread 1 i.e. the node referenced byannounce[0]
.This is inevitable. In order for threads to be pre-empted, other threads must be sequencing nodes and as they do so, they will continually move their
lastSequence
ahead. If a given thread's node is continually not sequenced, eventually all the threads will be pointing to it's index in the announce array. No thread will do anything else until the node it is trying to help has been sequenced, the worst case scenario is that all threads are pointing to the same unsequenced node. Therefore, the time required to sequence any invocation is a function of the number of threads and not the size of the input.