Java – How to make a universal construction more efficient

concurrencyjavamultithreading

A "universal construction" is a wrapper class for a sequential object that enables it to be linearized (a strong consistency condition for concurrent objects). For instance, here's an adapted wait-free construction, in Java, from [1], which presumes the existence of a wait-free queue that satisfies the interface WFQ (which only requires one-time consensus between threads) and assumes a Sequential interface:

public interface WFQ<T> // "FIFO" iteration
{
    int enqueue(T t); // returns the sequence number of t
    Iterable<T> iterateUntil(int max); // iterates until sequence max
}
public interface Sequential
{
    // Apply an invocation (method + arguments)
    // and get a response (return value + state)
    Response apply(Invocation i); 
}
public interface Factory<T> { T generate(); } // generate new default object
public interface Universal extends Sequential {}

public class SlowUniversal implements Universal
{
    Factory<? extends Sequential> generator;
    WFQ<Invocation> wfq = new WFQ<Invocation>();
    Universal(Factory<? extends Sequential> g) { generator = g; } 
    public Response apply(Invocation i)
    {
        int max = wfq.enqueue(i);
        Sequential s = generator.generate();
        for(Invocation invoc : wfq.iterateUntil(max))
            s.apply(invoc);
        return s.apply(i);
    }
}

This implementation isn't very satisfying since it is really slow (you remember every invocation, and have to replay it at every apply – we have linear runtime in the history size). Is there any way that we could extend the WFQ and Sequential interfaces (in reasonable ways) to enable us to save some steps when applying a new invocation?

Can we make this more efficient (not linear runtime in history size, preferably memory usage goes down too) without losing the wait-free property?

Clarification

A "universal construction" is a term I'm pretty sure was made up by [1] which accepts a thread-unsafe but thread-compatible object, which is generalized by the Sequential interface. Using a wait-free queue, the first construction offers a thread-safe, linearizable version of the object that's also wait-free (this assumes determinism and halting apply operations).

This is inefficient, since the method is effectively to have each local thread start from a clean slate and applies every operation ever recorded to it. In any case, this works because it achieves synchronization effectively by using the WFQ to determine the order in which all operations should be applied: every thread calling apply will see the same local Sequential object, with the same sequence of Invocations applied to it.

My question is whether we can (e.g.) introduce a background cleanup process which updates the "starting state" so that we don't have to restart from scratch. This isn't as simple as having an atomic pointer with a starting pointer – these kinds of approaches easily lose the wait-free guarantee. My suspicion is that some other queue-based approach might work here.

Jargon:

  1. wait-free – regardless of the number of threads or the scheduler's decision making, apply will terminate in a provably bounded number of instructions executed for that thread.
  2. lock-free – same as above, but admits the possibility of an unbounded execution time, only in the case that an unbounded number of apply operations are getting done in other threads. Typically, optimistic synchronization schemes fall into this category.
  3. blocking – efficiency at the mercy of the scheduler.

A working example, as requested (now on a page that won't expire)

[1] Herlihy and Shavit, The Art of Multiprocessor Programming.

Best Answer

Here's an explanation and example of how this is accomplished. Let me know if there are parts that aren't clear.

Gist with source

Universal

Initialization:

Thread indexes are applied in a atomically incremented fashion. This is managed using an AtomicInteger named nextIndex. These indexes are assigned to threads through a ThreadLocal instance which initializes itself by getting the next index from nextIndex and incrementing it. This happens the first time each thread's index is retrieved the first time. A ThreadLocal is created to track the last sequence this thread created. It's initialized 0. The sequential factory object reference is passed in and stored. Two AtomicReferenceArray instances are created of size n. The tail object is assigned to each reference, having been initialized with the initial state provided by the Sequential factory. n is the maximum number of threads allowed. Each element in these arrays 'belongs' to corresponding thread index.

Apply method:

This is the method that does the interesting work. It does the following:

  • Create a new node for this invocation: mine
  • Set this new node in the announce array at the current thread's index

Then the sequencing loop begins. It will continue until the current invocation has been sequenced:

  1. find a node in the announce array using the sequence of the last node created by this thread. More on this later.
  2. if a node is found in step 2 it is not yet sequenced, continue with it, otherwise, just focus on the current invocation. This will only try to help one other node per invocation.
  3. Whatever node was selected in step 3, keep trying to sequence it after the last sequenced node (other threads may interfere.) Regardless of success, set the current threads head reference to the sequence returned by decideNext()

The key to the nested loop described above is the decideNext() method. To understand that, we need to look at the Node class.

Node class

This class specifies nodes in a doubly-linked list. There's not a lot of action in this class. Most of the methods are simple retrieval methods that should be fairly self-explanatory.

tail method

this returns a special node instance with a sequence of 0. It simply acts as a place-holder until an invocation replaces it.

Properties and initialization

  • seq: the sequence number, initialized to -1 (meaning unsequenced)
  • invocation: the value of the invocation of apply(). Set upon construction.
  • next: AtomicReference for the forward link. once assigned, this will never be changed
  • previous: AtomicReference for the backward link assigned upon sequencing and cleared by truncate()

Decide Next

This method is only one in Node with non-trivial logic. In a nutshell, a node is offered as a candidate to be the next node in the linked list. The compareAndSet() method will check if it's reference is null and if so, set the reference to the candidate. If the reference is already set, it does nothing. This operation is atomic so if two candidates are offered at the same moment, only one will be selected. This guarantees only one node will ever be selected as the next one. If the candidate node is selected, it's sequence is set to the next value, and it's previous link is set to this node.

Jumping Back to the Universal class apply method ...

Having called decideNext() on the last sequenced node (when checked) with either our node or a node from the announce array, there are two possible occurrences: 1. The node was successfully sequenced 2. Some other thread pre-empted this thread.

The next step is to check whether the node created for this invocation. This could happen because this thread successfully sequenced it or some other thread picked it up from the announce array and sequenced it for us. If it has not been sequenced, the process is repeated. Otherwise the call finishes up by clearing the announce array for at this thread's index and returning the result value of the invocation. The announce array is cleared to guarantee there are no references to the node left around that would prevent the node from being garbage collected and therefore keep all nodes in the linked list from that point on alive on the heap.

Evaluate method

Now that the invocation's node has been successfully sequenced, the invocation needs to be evaluated. To do that, the first step is to ensure that the invocations preceding this one have been evaluated. If they haven't this thread will not wait but will do that work immediately.

EnsurePrior method

The ensurePrior() method does this work by checking the previous node in the linked list. If it's state is not set, the previous node will be evaluated. Node that this is recursive. If the node prior to prior node has not been evaluated, it will call evaluate for that node and so on an so forth.

Now that the previous node is known to have a state, we can evaluate this node. The last node is retrieved and assigned to a local variable. If this reference is null, it means that some other thread has pre-empted this one and already evaluated this node; setting it's state. Otherwise, the prior node's state is passed to the Sequential object's apply method along with this node's invocation. The state returned is set on the node and the truncate() method is called, clearing the backward link from the node as it is no longer needed.

MoveForward method

The move forward method will attempt to move all head references to this node if they are not already pointing to something further along. This is to ensure that if a thread stops calling, it's head will not retain a reference to an node that is no longer needed. The compareAndSet() method will make sure we only update the node if some other thread hasn't changed it since it was retrieved.

Announce array and helping

The key to making this approach wait-free as opposed to simply lock-free is that we can't assume that the thread scheduler will give each thread priority when it needs it. If each thread simply attempted to sequence it's own nodes, it's possible that a thread could be continually pre-empted under load. To account for this possibility each thread will first try to 'help' other threads that may be unable to get sequenced.

The basic idea is that as each thread successfully creates nodes, the sequences assigned are monotonically increasing. If a thread or threads are continually pre-empting another thread, the index the use to find unsequenced nodes in the announce array will move forward. Even if every thread that is currently trying to sequence a given node is continually pre-empted by another thread, eventually all threads will be trying to sequence that node. To illustrate, we'll construct an example with three threads.

At the starting point, all three threads' head and announce elements are pointed at the tail node. The lastSequence for each thread is 0.

At this point, Thread 1 is executed with an invocation. It checks the announce array for it's last sequence (zero) which is the node it is currently scheduled to index. It sequences the node and it's lastSequence is set to 1.

Thread 2 is now executed with an invocation, it checks the announce array at it's last sequence (zero) and sees that it doesn't need help and so attempts to sequence it's invocation. It succeeds and now it's lastSequence is set to 2.

Thread 3 is now executed and it also sees that the node at announce[0] is already sequenced and sequences it's own invocation. It's lastSequence is now set to 3.

Now Thread 1 is invoked again. It checks the announce array at index 1 and finds that it's already sequenced. Concurrently, Thread 2 is invoked. It checks the announce array at index 2 and finds that it is already sequenced. Both Thread 1 and Thread 2 now attempt to sequence their own nodes. Thread 2 wins and it sequences it's invocation. It's lastSequence is set to 4. Meanwhile, thread three has been invoked. It checks the index it lastSequence (mod 3) and finds that the node at announce[0] has not been sequenced. Thread 2 is again invoked at the same time that Thread 1 is on it's second attempt. Thread 1 finds an unsequenced invocation at announce[1] which is the node just created by Thread 2. It attempts to sequence Thread 2's invocation and succeeds. Thread 2 finds it's own node at announce[1] and it has been sequenced. It set's it's lastSequence to 5. Thread 3 is then invoked and finds that node that thread 1 placed at announce[0] is still not sequenced and attempts to do so. Meanwhile Thread 2 has also been invoked and pre-empts Thread 3. It sequences it's node and sets it's lastSequence to 6.

Poor Thread 1. Even though Thread 3 is trying to sequence it, both threads have been continually thwarted by the scheduler. But at this point. Thread 2 is also now pointing to announce[0] (6 mod 3). All three threads are set to attempt to sequence the same invocation. No matter which thread succeeds, the next node to be sequenced will be the waiting invocation of Thread 1 i.e. the node referenced by announce[0].

This is inevitable. In order for threads to be pre-empted, other threads must be sequencing nodes and as they do so, they will continually move their lastSequence ahead. If a given thread's node is continually not sequenced, eventually all the threads will be pointing to it's index in the announce array. No thread will do anything else until the node it is trying to help has been sequenced, the worst case scenario is that all threads are pointing to the same unsequenced node. Therefore, the time required to sequence any invocation is a function of the number of threads and not the size of the input.

Related Topic