It sounds to me like your concurrency issue could be solved by using one queue per producer, where each producer queue refuses to dole out another work item until the previous one has completed. Then you just need each consumer to select fairly from amongst the non-empty producer queues. This should be pretty efficient if all producers generate work items at roughly the same rate and all work items require roughly the same amount of processing by the consumer.
If different producers generate work items at different rates or require different amounts of processing, you may need to change the priority to stop lower rate producers (or those producing higher complexity work units) effectively getting priority over higher rate (or low workload) producers.
One example would be for consumers to select the producer queue with the most items, not just those with no items, unfortunately this could produce other kinds of inefficiencies. How you balance your next work item priorities will thus require tweaking according to the dynamics of the system.
The answer by scriptin is giving a reasonable solution, but doesn't answer your questions about the failure of the implementation you had.
- You get a 'queue empty' error when you call
pop
on a Ruby queue in non-blocking mode (true
passed) and there are no items on the queue.
- Your "producer" thread is created and your "consumer" thread is created, but they'll run independently and start executing whenever Ruby feels like it.
- If your "consumer" runs before the "producer" has pushed anything onto the queue, then obviously the "consumer" will pop an empty queue and throw an exception.
The solution to this one is easy; the consumer shouldn't use non-blocking mode. Don't pass true
, just call pop()
. The whole point of the consumer thread is that it can block on a queue waiting for the producer, without the rest of the application halting.
As for calling lastfm.track.scrobble
- yes, this is executing that method immediately and pushing the result on the queue, which is exactly not what you want :-D and is in fact the reason why "reliably" your consumer thread gets to pop its queue before the producer ever has a chance to push anything (even if the producer thread runs first, it hits a slow 'scrobble' call and sits waiting for HTTP requests/responses to complete; so the consumer thread executes and tries to read from the empty queue).
You want to push the parameters to scrobble
onto the queue, then have the consumer thread do the scrobbling. It's "nicer"/more structured to make little classes encapsulating the work and push instances of those onto your queue, but a cheap-and-nasty solution just uses the Hash which scrobble
takes as input. We also need to remember to loop over the queue entries inside the consumer, instead of only processing one! Thus (UNTESTED) we might have:
queue = SizedQueue.new(10)
Thread.new do
songs.each do |song|
queue << { artist: song[:artist], track: song[:track] }
end
queue << "done"
end
consumer = Thread.new do
loop do
scrobble_args = queue.pop()
Thread.exit if scrobble_args == "done"
lastfm.track.scrobble(scrobble_args)
puts "Scrobbled #{scrobble_args[:track] by scrobble_args[:artist]}."
end
end
consumer.join
This should technically work but is totally pointless, because that one lonely consumer thread is just processing the queue in serial. So let's simplify this. Really, you just want to dump all of 'songs' into the queue and have the consumer thread do the work. Since we want to run in parallel, we'll make an array of threads.
# I'm not bothering with the producer thread here so the queue size is unlimited.
# You could just as well put the 'songs.each' inside a producer thread with a
# a sized queue, as per your original code, if you wanted.
#
queue = Queue.new
songs.each do | song |
queue << song
end
consumers = []
1.upto( 10 ) do
consumers << Thread.new do
# Keep looping until asked to exit
#
loop do
song = queue.pop()
Thread.exit if song == 'done'
lastfm.track.scrobble( artist: song[ :artist ], track: song[ :track ] )
puts "Scrobbled #{ song[ :track ] } by #{ song[ :artist ] }."
end
end
# Push one 'done' exit instruction for each consumer
#
queue << 'done'
end
# Wait for all consumer threads to complete
#
consumers.each do | consumer |
consumer.join()
end
...and that should do the trick. The "Parallel" gem will probably be doing much the same sort of thing under the hood.
Best Answer
The difference between those choices are in the affinity of task assignments to threads.
As I explain in an earlier question, it is entirely your choice to implement this affinity or not. There are ways to implement multithreading without the affinity of tasks to threads.
Producer-Consumer pattern is suitable if:
To maximize throughput(*), one generally will try:
(*) Maximizing throughput - number of video frames processed per second - is the ultimate goal. Not maximizing CPU utilization.
Some optimizations such as algorithm improvements will decrease CPU utilization of that stage, meanwhile increasing overall efficiency so that overall, the same result can be computed with a lower number of total CPU instructions being executed.
(*) The reordering queue is necessary because when two pieces of data, tagged
[0]
and[1]
respectively are processed, sometimes the output for[1]
will finish ahead of time. To preserve the ordering property of the pipeline, the output for[1]
has to be held back until the output for[0]
is ready.If these changes are not sufficient to maximize throughput to the desired level, one could also move away from the single-pipeline producer-consumer pattern, and move to a dataflow or data-graph framework.
In a more general dataflow or data-graph framework:
Disclaimer: some of the terminology used here may be loose or incorrect. Corrections are welcome.