.NET Distributed Computing – Managing Long-Running Jobs

distributed computingnet

Scenario

Various types of files containing customer's data are being uploaded via FTP, and a POST is being made to a REST API to update a database table, which tells the system (a web app) which of those files are new.

Users of the web app are notified of those new files, and can choose to import them into the app's db at any time. They can only import them from the oldest to the newest, but can freely choose to import only a subset of them (don't ask me why, this is a requirement and it also happens to be a key aspect of the issue I'm facing).
When the user chooses which files to import, said table gets updated with a timestamp which signals to the system that the file has to be imported.

A Windows service queries the table every X seconds and, if it finds files flagged with the timestamp, retrieves all the enabled files for the customer with the oldest one, sorts them by creation date (which is embedded in the name of the file) for good measure, and imports them. Then the process repeats for the next customer with enabled files, and so on and so forth.

Problem

Now, importing those files can take a rather long time, with the net result that import requests queue up but, being processed serially, can take hours to be fulfilled. Since it's accounting and payroll data we're talking about, customers aren't willing to wait that much.

We definitely need to speed things up.

What can't be done

The low-hanging fruit would appear to be attempting to parallelize the import of individual files belonging to the same customer, but that can't be done, because a file contains data about a specific month and, in many cases, e.g. July's file has to be imported before August's. In other words, most the file types we're dealing with have a temporal dependency on one another (and if they don't now, they might in the future).

What could be done

Another idea is to keep a queue of the import requests, have some workers available, and parallelize the import by customer.

But here comes into play the requirement I wrote about before: the customer, while constrained to make individual import requests in chronological order, can choose to only import a subset of them.

Say Alice sees there are 6 new files to import, and decides to import 3 of them. Ok, the background job gets queued, worker X is available and grabs the task and executes it. But let's say that Alice decides to import the other 3 files while worker X is still busy. The request gets queued and worker Y, who happens to be available, starts to work on those 3 files.

What would happen is that data could be imported in the wrong order, leading to all sorts of problems.

What I need, I guess, is a way to somehow say

If job A is currently being executed for customer X, a new one B for the same customer X needs to be kept in the queue until A is done. If, however, customer Y creates a new job C, it can be picked up as soon as a worker frees up".

But why don't you make the import faster?

It's a huge and complex piece of software and, while I've been able to make it faster, it has to execute a lot of IronPython scripts, which implies a great overhead. It could for sure be made faster, but from the profiling I did, script execution is the chief bottleneck, and we simply can't afford to move all that logic to C# (even because it's heavily customized for each customer).

Then why don't you make the import itself parallel?

While it's true that the import process is typically composed of many multiple steps, some of those steps again have a temporal dependency, meaning that some of them need to be executed first (e.g. I can't import data about payrolls if new hires haven't been imported first).
I went through the trouble of making it transactional, so interrupting any of those steps won't actually do much harm, but that was very little fun. Making it parallel would require extensive changes to the code, and I would prefer to leave it untouched, as it's pretty hairy and without tests whatsoever.

Why don't you just use Hangfire?

Because Hangfire can't handle dependencies between tasks. In the future I will have to enqueue tasks that don't have absolutely anything to do with one another, and in that case I will be able to use Hangfire.

Ok then…

It's clear that things could, in theory, be parallelized at one or more of these three levels:

  1. customer, which owns
  2. files, each of which takes many simpler
  3. steps to be imported

All of those three levels have the same problem: temporal coupling. If they didn't, things would be that much easier.
The third would imply changes to the import code, which I would rather not do. It takes a list of files to import, so I would much rather parallelize at the first two levels, the easiest being the customer.

My questions

Assume I'm going to use ZeroMQ/RabbitMQ or something of that sort. I could store the IDs of the files to import and enqueue them.

But:

  • how do you build one of those workers, anyway? Is it going to be a thread spawned by my already-existing Windows service which sits there idle polling the queue until there's a task to execute?

  • could such a design scale across machines, should the need arise to throw more hardware at it?

  • if that's the case, I guess I should deploy a new instance of my service on the other machines, right?

  • how do you establish dependencies between jobs?

  • how can I see if a job for customer X is already executing and, in that case, leave any other job for the same customer in the queue? Sounds like writing a flag to a database table would lead to race conditions.

  • I guess I should take measures to somehow take and persist a snapshot of the queue every X minutes, in case the server goes down or needs to be rebooted.

Bear with me, I never had the chance to do any distributed programming, and reading here and there I only found vague explanations.

Sorry about the length of this question, but I wanted to narrow down the specific scenario.

Best Answer

how do you build one of those workers, anyway? Is it going to be a thread spawned by my already-existing Windows service which sits there idle polling the queue until there's a task to execute?

how can I see if a job for customer X is already executing and, in that case, leave any other job for the same customer in the queue? Sounds like writing a flag to a database table would lead to race conditions.

With a per-customer worker, the worker itself is pretty straightforward since it just contains a running job and a FIFO queue of items to process. On the .NET platform, the worker could be represented by:

  • ActionBlock from the TPL DataFlow library
  • MailboxProcessor from F#
  • Task + Queue<T> + locking (on enqueue, dequeue, and task complete)
    • Note that there does exist ConcurrentQueue<T>, but in this type of code, the benefits are negligible in my experience. It's concurrent for the queue, but you still have to lock to update your own statistics variables and then the usage pattern gets uncomfortable.
  • A number of other more basic components like threads, wait handles, etc. (not recommended).

The first 2 components are pre-built to take care of only executing one job at a time while queuing up the others.

You can then use a Dictionary<CustomerId, Worker> to represent what's happening overall.

I actually built this in F# using MailboxProcessors. It takes care of keeping an agent per whatever criteria you can pull from the message like CustomerId. It cleans up idle agents automatically.

could such a design scale across machines, should the need arise to throw more hardware at it?

Scaling across machines can be done in various ways. Likely what you will want is partitioning. The easiest way is by hashing on some request value to decide which server to send to. For instance, if your customer ID is an integer (or can be consistently reduced to one... e.g. with GetHashCode) and you have 3 machines processing tasks, you can use a simple modulus to decide which customer should go to which machine. machineNumber = customerId % 3. CustomerId 1,4,7, etc will always go to machine 1. However, if Machine 1 goes down, 1/3 of the customer requests will not get processed until it comes back up. Since these are long running imports anyway, that's likely not a big deal. The load will also not be distributed evenly, since there are usually some customers who are heavier users. Again, probably not a huge deal. Measure to make sure.

Another way that is resilient to failure is to use a distributed directory. It keeps track of which node currently owns which customer. Project Orleans uses a mechanism like this. It allows for nodes to fail and customers to be transitioned to another node. Before allocating a new customer on a node, you can also query the node to see which is the least loaded. However, I'm not aware of a pre-built component for this purpose, and building it yourself is perilous to your time.

if that's the case, I guess I should deploy a new instance of my service on the other machines, right?

Correct. Having partitioning be separate from the worker, the same worker service code could be running on all servers. The worker service is ignorant of how it is partitioned.

how do you establish dependencies between jobs?

The dependency you've described so far is only temporal, and your application prevents the user from submitting files out of order. So I am not understanding your need here. The worker components described above process jobs one at a time in order. So long as you submit them in temporal order, you're good.

When I have done dependencies in the past (example: if FileTypeB and FileTypeA are both in queue, then FileTypeA must be processed first), I was doing a custom worker where I kept multiple data structures to represent the queue of work. For example, a Dictionary<CustomerId, Tuple<Worker, List<FileRequest>>> (not that I would actually use tuple here). Basically, each customer had a worker and a list of outstanding requests. When the worker came available, I would scan the outstanding requests, determine dependencies, and pick one with the highest priority (the most depended on) -- all under lock. Then the chosen file was processed.

For the F# MailboxProcessor, there also exist the Scan method which can be used to scan the submitted messages until a desirable one is chosen. However, this probably wouldn't work as well for an exhaustive search like a finding the lowest date.

As far as how to represent dependencies, a tree structure is common.

FileTypeC -> FileTypeB -> FileTypeA

I guess I should take measures to somehow take and persist a snapshot of the queue every X minutes, in case the server goes down or needs to be rebooted.

Instead of periodically persisting the queue, just save each request as it comes in and update it as changes phases... example: queued/complete. On restart, you can query the database for incomplete jobs and try to resubmit them to workers. I might add some guards into the worker (especially if it is on a separate server) so that when a job is submitted, it will check its internal state for the same job (e.g. by job id) and respond with a "No, this job is already queued/running/complete." as necessary.

Related Topic