Java Multithreading – Threaded Batching Algorithm

javamultithreading

Ok, I'm working on DynamoDb-based caching. We have an API where each call costs us real money and the information retrieved is "fresh" for 2-3 weeks, so it makes sense to cache it.
There's no problem with calling dynamoDbClient.get(apiUrl) and retrieving the stored JSON response. They have batches – 25 batch items for PUT and 100 batch items for GET. So it means if we need get JSON response for 100 items we do one call instead of 100.

The question I have is how to organize this in the best way for threading.

Here's general idea and I'm open to suggestions.

Assuming we have a batch call with 100 items, we can use a BlockingQueue<String> to store the keys which we can use later. We would use blockingQueue.put() method, to make other threads wait if 150 threads arrived into method having a batch of only 100 available.

So thread that enters method, gets his place in batch queue, needs some "locking" mechanism to wait until response arrives and "wake him up" that his response is available.

pseudoCodeMethod(String resourceUrl) {
  blockingQueue.put(resourceUrl); // sleep if no place
  LockableResponse lockableResponse = getLockableResponse();
  lockableResponse.setKey(resourceUrl);
  lockableResponse.getSemaphore().acquire();

  String jsonApiResponse = lockableResponse.getJsonApiResponse();
  lockableResponse.clean();

  return jsonApiResponse;
}

LockableResponse{
  String resourceUrl;
  String jsonApiResponse;
  Semaphore semaphore = new Semaphore(0);

  public void clean(){
     resourceUrl = null;
     jsonApiResponse = null;         
  } 
}

My though is that we would associate a resourceUrl for each dynamodb response with a data structure that has semaphore to await for api response which arrives for ~100 items in one batch call.

A separate thread performs the call and then iterates through response, assigns jsonApiResponse to proper resourceUrl [concurrentMap for that?? ] and then calls lockableResponse.release() to wake up thread, so thread takes resourceUrl and exists the method.

The clean() method could be added to allow reuse of the same structure. I'm thinking about array of lockable resource according to batch capacity. So they could be cleared and reused for other batch GET call.

Thoughts? Suggestions?

Best Answer

Here is a reactive approach to your problem. RxJava allows you to manage resources, especially threads, in a style similar to that used in Java streams.

The request queue for GETs is a SerializedSubject which is where requests are sent. SerializedSubject is thread safe, and allows requests to be made from any thread.

SerializedSubject<Pair<Observable<JsonResponse>, String> requestGetQueue =
  PublishSubject.<Pair<Observable<JsonResponse>, String>create().toSerial();

Once the subject is declared, set up processing of the queued requests. The observeOn() operator tells the observer chain to perform processing on a particular scheduler, which then selects a thread from its pool to perform all the operations on. RxJava takes of the thread-hopping to go from the calling thread to the thread that handles the requests.

requestGetQueue
  .observeOn( Schedulers.io() )
  .buffer( 100 )  // batch size
  .subscribe( requestList -> processRequestList( requestList ),
    error -> log.error( error ) );

The buffer() operator batches up a set of requests into a list. The batch size is 100, per the original posting. The buffer() operator can take an additional parameter to set a timeout, so that eventually a lone group of requests will be handled and nothing gets stuck. That's a business decision whether you want to handle less than 100 requests at any time.

Observable<JsonResponse> getApiValue( String url ) {
  BehaviorSubject<JsonResponse> responseFromApi = BehaviorSubject.create();
  requestQueue.onNext( new Pair<>(responseFromApi, url) );
  return responseFromApi;
}

The API request for clients is very simple. What the caller gets back is the equivalent of a future. The client makes the request and establishes a call-back using the subscribe() step to handle the JsonResponse that will eventually come through. An error handler is also required, since ... well, errors happen. The observeOn() operator is used to move the response handling back on to another thread. More on that in a bit.

getApiValue( apiUrlString )
  .observeOn( clientScheduler )
  .subscribe( jsonResponse -> { ... },
    error -> { ... } );

Handling the request is simply batching up the list of URLs, waiting for the list of responses to come back and pairing up the responses to the requests. The code below assumes all responses come back in order. The response is emitted to the client by using onNext() followed by onCompleted().

void processRequestList( List<Pair<Observable<JsonResponse>,String>> requestList ) {
  List<String> uriList = new ArrayList<>();
  for ( int s = 0; s < requestList.size(); s++ ) {
    uriList.add( requestList.getSecond() );
  }
  List<JsonResponse> results = sendBatchRequest( uriList );
  for ( int i = 0; i < requestList.size(); i++ ) {
    requestList.get(i).getFirst().onNext( results.get(i) );
    requestList.get(i).getFirst().onCompleted();
  }
}

I mentioned earlier that the client needs to use a scheduler to move processing back to its own thread. How you do this depends on how client threads are set up. You can create the client scheduler from an executor service:

clientScheduler = Schedulers.from( executorService );

and using observeOn( clientScheduler ) will cause subsequent operations to move on to the client thread.

Summary

RxJava, and similar reactive platforms, manage almost all of the details of threads, locks, semaphores, mutexes, blocking queues, timers, responses, etc, for you. You still have to understand what things are needed for thread safety, and when processing is performed on particular threads, but almost of the finicky, tricky stuff is kept behind the scenes.

Related Topic