in this simple case, yes, there's no real benefit in using threads other than showing how threads work I guess. It's still interesting though: P's event loop keeps on running while the actual work is done by C. This is what you want in order to keep the user interface (which P might very well be) responsive.
Things get more interesting with concrete examples. Suppose C comsumes data by storing it in a buffer, and dumps it to a file when the buffer has enough data. This dumping might take while, but P can just continue it's business putting data in the queue without noticing C is busy nor without any slowdown.
The answer by scriptin is giving a reasonable solution, but doesn't answer your questions about the failure of the implementation you had.
- You get a 'queue empty' error when you call
pop
on a Ruby queue in non-blocking mode (true
passed) and there are no items on the queue.
- Your "producer" thread is created and your "consumer" thread is created, but they'll run independently and start executing whenever Ruby feels like it.
- If your "consumer" runs before the "producer" has pushed anything onto the queue, then obviously the "consumer" will pop an empty queue and throw an exception.
The solution to this one is easy; the consumer shouldn't use non-blocking mode. Don't pass true
, just call pop()
. The whole point of the consumer thread is that it can block on a queue waiting for the producer, without the rest of the application halting.
As for calling lastfm.track.scrobble
- yes, this is executing that method immediately and pushing the result on the queue, which is exactly not what you want :-D and is in fact the reason why "reliably" your consumer thread gets to pop its queue before the producer ever has a chance to push anything (even if the producer thread runs first, it hits a slow 'scrobble' call and sits waiting for HTTP requests/responses to complete; so the consumer thread executes and tries to read from the empty queue).
You want to push the parameters to scrobble
onto the queue, then have the consumer thread do the scrobbling. It's "nicer"/more structured to make little classes encapsulating the work and push instances of those onto your queue, but a cheap-and-nasty solution just uses the Hash which scrobble
takes as input. We also need to remember to loop over the queue entries inside the consumer, instead of only processing one! Thus (UNTESTED) we might have:
queue = SizedQueue.new(10)
Thread.new do
songs.each do |song|
queue << { artist: song[:artist], track: song[:track] }
end
queue << "done"
end
consumer = Thread.new do
loop do
scrobble_args = queue.pop()
Thread.exit if scrobble_args == "done"
lastfm.track.scrobble(scrobble_args)
puts "Scrobbled #{scrobble_args[:track] by scrobble_args[:artist]}."
end
end
consumer.join
This should technically work but is totally pointless, because that one lonely consumer thread is just processing the queue in serial. So let's simplify this. Really, you just want to dump all of 'songs' into the queue and have the consumer thread do the work. Since we want to run in parallel, we'll make an array of threads.
# I'm not bothering with the producer thread here so the queue size is unlimited.
# You could just as well put the 'songs.each' inside a producer thread with a
# a sized queue, as per your original code, if you wanted.
#
queue = Queue.new
songs.each do | song |
queue << song
end
consumers = []
1.upto( 10 ) do
consumers << Thread.new do
# Keep looping until asked to exit
#
loop do
song = queue.pop()
Thread.exit if song == 'done'
lastfm.track.scrobble( artist: song[ :artist ], track: song[ :track ] )
puts "Scrobbled #{ song[ :track ] } by #{ song[ :artist ] }."
end
end
# Push one 'done' exit instruction for each consumer
#
queue << 'done'
end
# Wait for all consumer threads to complete
#
consumers.each do | consumer |
consumer.join()
end
...and that should do the trick. The "Parallel" gem will probably be doing much the same sort of thing under the hood.
Best Answer
First, the structure you want in this case is probably a priority queue. You'll use the expiration time of each item to determine its priority in the queue.
In this case, the consumer thread waits for one of two things to happen: the time of the first item in the queue to come due, or a newly-inserted item ends up at the beginning of the queue.