Here's an explanation and example of how this is accomplished. Let me know if there are parts that aren't clear.
Gist with source
Universal
Initialization:
Thread indexes are applied in a atomically incremented fashion. This is managed using an AtomicInteger
named nextIndex
. These indexes are assigned to threads through a ThreadLocal
instance which initializes itself by getting the next index from nextIndex
and incrementing it. This happens the first time each thread's index is retrieved the first time.
A ThreadLocal
is created to track the last sequence this thread created. It's initialized 0.
The sequential factory object reference is passed in and stored.
Two AtomicReferenceArray
instances are created of size n
. The tail object is assigned to each reference, having been initialized with the initial state provided by the Sequential
factory. n
is the maximum number of threads allowed. Each element in these arrays 'belongs' to corresponding thread index.
Apply method:
This is the method that does the interesting work. It does the following:
- Create a new node for this invocation: mine
- Set this new node in the announce array at the current thread's index
Then the sequencing loop begins. It will continue until the current invocation has been sequenced:
- find a node in the announce array using the sequence of the last node created by this thread. More on this later.
- if a node is found in step 2 it is not yet sequenced, continue with it, otherwise, just focus on the current invocation. This will only try to help one other node per invocation.
- Whatever node was selected in step 3, keep trying to sequence it after the last sequenced node (other threads may interfere.) Regardless of success, set the current threads head reference to the sequence returned by
decideNext()
The key to the nested loop described above is the decideNext()
method. To understand that, we need to look at the Node class.
Node class
This class specifies nodes in a doubly-linked list. There's not a lot of action in this class. Most of the methods are simple retrieval methods that should be fairly self-explanatory.
tail method
this returns a special node instance with a sequence of 0. It simply acts as a place-holder until an invocation replaces it.
Properties and initialization
seq
: the sequence number, initialized to -1 (meaning unsequenced)
invocation
: the value of the invocation of apply()
. Set upon construction.
next
: AtomicReference
for the forward link. once assigned, this will never be changed
previous
: AtomicReference
for the backward link assigned upon sequencing and cleared by truncate()
Decide Next
This method is only one in Node with non-trivial logic. In a nutshell, a node is offered as a candidate to be the next node in the linked list. The compareAndSet()
method will check if it's reference is null and if so, set the reference to the candidate. If the reference is already set, it does nothing. This operation is atomic so if two candidates are offered at the same moment, only one will be selected. This guarantees only one node will ever be selected as the next one. If the candidate node is selected, it's sequence is set to the next value, and it's previous link is set to this node.
Jumping Back to the Universal class apply method ...
Having called decideNext()
on the last sequenced node (when checked) with either our node or a node from the announce
array, there are two possible occurrences:
1. The node was successfully sequenced
2. Some other thread pre-empted this thread.
The next step is to check whether the node created for this invocation. This could happen because this thread successfully sequenced it or some other thread picked it up from the announce
array and sequenced it for us. If it has not been sequenced, the process is repeated. Otherwise the call finishes up by clearing the announce array for at this thread's index and returning the result value of the invocation. The announce array is cleared to guarantee there are no references to the node left around that would prevent the node from being garbage collected and therefore keep all nodes in the linked list from that point on alive on the heap.
Evaluate method
Now that the invocation's node has been successfully sequenced, the invocation needs to be evaluated. To do that, the first step is to ensure that the invocations preceding this one have been evaluated. If they haven't this thread will not wait but will do that work immediately.
EnsurePrior method
The ensurePrior()
method does this work by checking the previous node in the linked list. If it's state is not set, the previous node will be evaluated. Node that this is recursive. If the node prior to prior node has not been evaluated, it will call evaluate for that node and so on an so forth.
Now that the previous node is known to have a state, we can evaluate this node. The last node is retrieved and assigned to a local variable. If this reference is null, it means that some other thread has pre-empted this one and already evaluated this node; setting it's state. Otherwise, the prior node's state is passed to the Sequential
object's apply method along with this node's invocation. The state returned is set on the node and the truncate()
method is called, clearing the backward link from the node as it is no longer needed.
MoveForward method
The move forward method will attempt to move all head references to this node if they are not already pointing to something further along. This is to ensure that if a thread stops calling, it's head will not retain a reference to an node that is no longer needed. The compareAndSet()
method will make sure we only update the node if some other thread hasn't changed it since it was retrieved.
Announce array and helping
The key to making this approach wait-free as opposed to simply lock-free is that we can't assume that the thread scheduler will give each thread priority when it needs it. If each thread simply attempted to sequence it's own nodes, it's possible that a thread could be continually pre-empted under load. To account for this possibility each thread will first try to 'help' other threads that may be unable to get sequenced.
The basic idea is that as each thread successfully creates nodes, the sequences assigned are monotonically increasing. If a thread or threads are continually pre-empting another thread, the index the use to find unsequenced nodes in the announce
array will move forward. Even if every thread that is currently trying to sequence a given node is continually pre-empted by another thread, eventually all threads will be trying to sequence that node. To illustrate, we'll construct an example with three threads.
At the starting point, all three threads' head and announce elements are pointed at the tail
node. The lastSequence
for each thread is 0.
At this point, Thread 1 is executed with an invocation. It checks the announce array for it's last sequence (zero) which is the node it is currently scheduled to index. It sequences the node and it's lastSequence
is set to 1.
Thread 2 is now executed with an invocation, it checks the announce array at it's last sequence (zero) and sees that it doesn't need help and so attempts to sequence it's invocation. It succeeds and now it's lastSequence
is set to 2.
Thread 3 is now executed and it also sees that the node at announce[0]
is already sequenced and sequences it's own invocation. It's lastSequence
is now set to 3.
Now Thread 1 is invoked again. It checks the announce array at index 1 and finds that it's already sequenced. Concurrently, Thread 2 is invoked. It checks the announce array at index 2 and finds that it is already sequenced. Both Thread 1 and Thread 2 now attempt to sequence their own nodes. Thread 2 wins and it sequences it's invocation. It's lastSequence
is set to 4. Meanwhile, thread three has been invoked. It checks the index it lastSequence
(mod 3) and finds that the node at announce[0]
has not been sequenced. Thread 2 is again invoked at the same time that Thread 1 is on it's second attempt. Thread 1 finds an unsequenced invocation at announce[1]
which is the node just created by Thread 2. It attempts to sequence Thread 2's invocation and succeeds. Thread 2 finds it's own node at announce[1]
and it has been sequenced. It set's it's lastSequence
to 5. Thread 3 is then invoked and finds that node that thread 1 placed at announce[0]
is still not sequenced and attempts to do so. Meanwhile Thread 2 has also been invoked and pre-empts Thread 3. It sequences it's node and sets it's lastSequence
to 6.
Poor Thread 1. Even though Thread 3 is trying to sequence it, both threads have been continually thwarted by the scheduler. But at this point. Thread 2 is also now pointing to announce[0]
(6 mod 3). All three threads are set to attempt to sequence the same invocation. No matter which thread succeeds, the next node to be sequenced will be the waiting invocation of Thread 1 i.e. the node referenced by announce[0]
.
This is inevitable. In order for threads to be pre-empted, other threads must be sequencing nodes and as they do so, they will continually move their lastSequence
ahead. If a given thread's node is continually not sequenced, eventually all the threads will be pointing to it's index in the announce array. No thread will do anything else until the node it is trying to help has been sequenced, the worst case scenario is that all threads are pointing to the same unsequenced node. Therefore, the time required to sequence any invocation is a function of the number of threads and not the size of the input.
Best Answer
Here is a reactive approach to your problem. RxJava allows you to manage resources, especially threads, in a style similar to that used in Java streams.
The request queue for GETs is a
SerializedSubject
which is where requests are sent.SerializedSubject
is thread safe, and allows requests to be made from any thread.Once the subject is declared, set up processing of the queued requests. The
observeOn()
operator tells the observer chain to perform processing on a particular scheduler, which then selects a thread from its pool to perform all the operations on. RxJava takes of the thread-hopping to go from the calling thread to the thread that handles the requests.The
buffer()
operator batches up a set of requests into a list. The batch size is 100, per the original posting. Thebuffer()
operator can take an additional parameter to set a timeout, so that eventually a lone group of requests will be handled and nothing gets stuck. That's a business decision whether you want to handle less than 100 requests at any time.The API request for clients is very simple. What the caller gets back is the equivalent of a future. The client makes the request and establishes a call-back using the
subscribe()
step to handle theJsonResponse
that will eventually come through. An error handler is also required, since ... well, errors happen. TheobserveOn()
operator is used to move the response handling back on to another thread. More on that in a bit.Handling the request is simply batching up the list of URLs, waiting for the list of responses to come back and pairing up the responses to the requests. The code below assumes all responses come back in order. The response is emitted to the client by using
onNext()
followed byonCompleted()
.I mentioned earlier that the client needs to use a scheduler to move processing back to its own thread. How you do this depends on how client threads are set up. You can create the client scheduler from an executor service:
and using
observeOn( clientScheduler )
will cause subsequent operations to move on to the client thread.Summary
RxJava, and similar reactive platforms, manage almost all of the details of threads, locks, semaphores, mutexes, blocking queues, timers, responses, etc, for you. You still have to understand what things are needed for thread safety, and when processing is performed on particular threads, but almost of the finicky, tricky stuff is kept behind the scenes.