Since you are moving away from a single master node (which is appropriate) you will have to change a few things. You will need to setup a Quorum. Since you already have 9 nodes, you are in good shape. For a Quorum to work you need 2n+1 nodes where (n) is the number of nodes that can go down and the system will still work. Within the Quorum a vote will take place on who the leader is, and what transactions are successful. This can be used to pass around configuration information and ensure everyone is synchronized without a database.
There are existing technologies out there that can help you with this. One of thos is ZooKeeper. It is an open source Apache v2 product for Distributed Coordination. You will need something along these lines. Whether it is using ZooKeeper or rolling your own their white papers will be invaluable. It can also be used to maintain your configuration information about each node.
ZooKeeper is written in Java but I have created a project (ZooKeeperNet that will allow it to be embeded within .NET application using IKVM. If this isn't acceptable then you'll want to read about Leader Elections when determining who will be the current Master node. I suggest reading all their Wiki pages and Recipes to get an idea of what you need to account for in a proper distributed system.
Just so you have a good understanding. ZooKeeper is the backing coordination system of Hadoop and HBase. Hadoop is a distributed Map/Reduce framework.
If you already aren't, you can use WCF adhoc or registry discovery information when attempting to find the current master node in your system. If only a single Master node is alive it will be the only registered to support IMaster features. Then your slave nodes will listen on each other's znodes for each other to go away, picking up being the Master almost immediately.
Keep in mind that in order to be high efficient, the data each node needs to work with has to be close (i.e. on the node itself) to the node. If one node acts as a data intermediary you won't be as efficient as you could if the nodes could pull data in a distributed fashion.
how do you build one of those workers, anyway? Is it going to be a thread spawned by my already-existing Windows service which sits there idle polling the queue until there's a task to execute?
how can I see if a job for customer X is already executing and, in that case, leave any other job for the same customer in the queue? Sounds like writing a flag to a database table would lead to race conditions.
With a per-customer worker, the worker itself is pretty straightforward since it just contains a running job and a FIFO queue of items to process. On the .NET platform, the worker could be represented by:
- ActionBlock from the TPL DataFlow library
- MailboxProcessor from F#
Task
+ Queue<T>
+ locking (on enqueue, dequeue, and task complete)
- Note that there does exist
ConcurrentQueue<T>
, but in this type of code, the benefits are negligible in my experience. It's concurrent for the queue, but you still have to lock to update your own statistics variables and then the usage pattern gets uncomfortable.
- A number of other more basic components like threads, wait handles, etc. (not recommended).
The first 2 components are pre-built to take care of only executing one job at a time while queuing up the others.
You can then use a Dictionary<CustomerId, Worker>
to represent what's happening overall.
I actually built this in F# using MailboxProcessors. It takes care of keeping an agent per whatever criteria you can pull from the message like CustomerId. It cleans up idle agents automatically.
could such a design scale across machines, should the need arise to throw more hardware at it?
Scaling across machines can be done in various ways. Likely what you will want is partitioning. The easiest way is by hashing on some request value to decide which server to send to. For instance, if your customer ID is an integer (or can be consistently reduced to one... e.g. with GetHashCode) and you have 3 machines processing tasks, you can use a simple modulus to decide which customer should go to which machine. machineNumber = customerId % 3
. CustomerId 1,4,7, etc will always go to machine 1. However, if Machine 1 goes down, 1/3 of the customer requests will not get processed until it comes back up. Since these are long running imports anyway, that's likely not a big deal. The load will also not be distributed evenly, since there are usually some customers who are heavier users. Again, probably not a huge deal. Measure to make sure.
Another way that is resilient to failure is to use a distributed directory. It keeps track of which node currently owns which customer. Project Orleans uses a mechanism like this. It allows for nodes to fail and customers to be transitioned to another node. Before allocating a new customer on a node, you can also query the node to see which is the least loaded. However, I'm not aware of a pre-built component for this purpose, and building it yourself is perilous to your time.
if that's the case, I guess I should deploy a new instance of my service on the other machines, right?
Correct. Having partitioning be separate from the worker, the same worker service code could be running on all servers. The worker service is ignorant of how it is partitioned.
how do you establish dependencies between jobs?
The dependency you've described so far is only temporal, and your application prevents the user from submitting files out of order. So I am not understanding your need here. The worker components described above process jobs one at a time in order. So long as you submit them in temporal order, you're good.
When I have done dependencies in the past (example: if FileTypeB and FileTypeA are both in queue, then FileTypeA must be processed first), I was doing a custom worker where I kept multiple data structures to represent the queue of work. For example, a Dictionary<CustomerId, Tuple<Worker, List<FileRequest>>>
(not that I would actually use tuple here). Basically, each customer had a worker and a list of outstanding requests. When the worker came available, I would scan the outstanding requests, determine dependencies, and pick one with the highest priority (the most depended on) -- all under lock
. Then the chosen file was processed.
For the F# MailboxProcessor, there also exist the Scan method which can be used to scan the submitted messages until a desirable one is chosen. However, this probably wouldn't work as well for an exhaustive search like a finding the lowest date.
As far as how to represent dependencies, a tree structure is common.
FileTypeC -> FileTypeB -> FileTypeA
I guess I should take measures to somehow take and persist a snapshot of the queue every X minutes, in case the server goes down or needs to be rebooted.
Instead of periodically persisting the queue, just save each request as it comes in and update it as changes phases... example: queued/complete. On restart, you can query the database for incomplete jobs and try to resubmit them to workers. I might add some guards into the worker (especially if it is on a separate server) so that when a job is submitted, it will check its internal state for the same job (e.g. by job id) and respond with a "No, this job is already queued/running/complete." as necessary.
Best Answer
Writing a basic queuing system is fairly simple, but as you noted above with all of the challenges, doing it right is another matter. I've used home grown systems for which I wrote the source code, 3rd party systems, and various JMS providers. JMS (Java Messaging Service) by far is the most complete solution I've encountered thus far. Much of what you ask is available in JMS. My favorite JMS provider is ActiveMQ. Free, performant, easy to install, and more importantly easy to embed in my app with Spring. JMS providers don't provide everything you asked for out of the box, but they provide a set of tools to handle much of what you asked about should your application need it. I haven't found lots of applications need everything you listed. Ordering might not be important (it's best if it isn't), durable topics might not be important, guaranteed delivery, etc. You just have to stick to the problem and use what it demands.
http://activemq.apache.org/what-open-source-integration-solution-works-best-with-activemq-.html
Does it have strong or lose ordering? Yes. It has both depending on your programs needs. Here are the details: http://activemq.apache.org/total-ordering.html.
Does it have idempotent put? No, but this is trivial to implement in your application layer should you need that.
Can we have more queues than what can fit on a single machine? Yes. You can have clustered servers, and if you wanted to setup multiple machines with different queues you could, and pull from either.
Can we have more data in a queue than what can fit on a single machine? Yes most JMS providers have to use some sort of DB/persistent storage to ensure messages aren't dropped or lost if the JMS provider goes down.
How many machines can crash before we potentially lose data? This is a little harder to answer because it's timing related. However, you can crash a JMS provider and provided the disk isn't corrupt it will come back up and start where it received the last commit. This means messages could be delivered twice, but if you code your app to handle this it's not a problem. As long as you have at least one of each type (producers, consumers, or JMS servers) it will complete. You can also have load/balance/failover for redundancy should a disk go out on you.
Can it tollerate net-splits? I think I understand what you mean by "net-split", but I'm not entirely sure. I guess you mean if the JMS servers are clustered, and we loose connection with one of the servers will it jump to another server and pickup where it left off. Yes, but again these types of situations can lead to duplicate messages depending on at what point the client lost connection.
Can it automatically reconcile data when a net-split is fixed? If you are using transacted sessions it will only redeliver any message that has had a commit called on it to existing clients that are up.
Can it guarantee delivery when clients can crash? Yes this is one of the main goals of JMS. Guaranteed delivery means that if a message is queued it's guaranteed to be handled by a client.
Can it guarantee that the same message is not delivered more than once? Yes if the transacted sessions are being used. That means a client has accepted the message and called commit/rollback. Once the commit is called it won't redeliver the message.
Can a node crash at any given point, come back up, and not send out junk? In the case where you have durable clustered queues. Yes it won't spew "junk" if the other node in the cluster has delivered the message. It can still redeliver anything that hasn't been acknowledged.
Can you add nodes to, or remove nodes from, a running cluster without down time? Yes.
Can you upgrade nodes in a running cluster without down time? This is a little trickier for me to answer, but I believe that yes you can do this.
Can it run without problems on heterogeneous servers? What does this mean exactly? I've found most JMS providers are very easy to run in environments using different hardware, OS, etc. Although, if you mean performance, that's a whole another thing. Any distributed processing system can be negatively impacted by a slow node. I had 2 8 Core Intel servers running the queue and the consumers. That's 16 cores together, and I got better performance from using only those two boxes, than when I added a single core machine as a consumer. That single core machine was so much slower it slowed down the entire grid by a factor of 2x. This had nothing to do with JMS per se.
Can you “stick” queues to a group of servers? Short answer yes. I can think of a way where you can run a cluster that's only in the european data center, and configure the queue there. Then in your spring config setup your consumers to consume that queue as well as other queues on other clusters. You might want to consult the docs:
http://activemq.apache.org/clustering.html
Can it make sure to put data replicas in at least two datacenters, if so available? Again I believe so, but it's best to consult the clustering docs.
Again JMS has lots of options you can tweak as your need dictates. Using transacted sessions and durable queues comes with a performance cost. I've seen turning on all the bells and whistles impact performance as much as 10x. When I used JBossMQ if we turned off some of these features we could get around 10,000 messages/s, but turning them on brought us down to 1000 messages/s. Big drop.