Design – How does the consumer-producer solution work

designdesign-patternsmultithreadingruby

I'm only a beginner, and my book doesn't cover this subject. I have researched my problem and found that an implementation of the consumer-producer pattern is the ideal solution, and have Googled it, read what I could, tried to enact examples… but haven't been lucky. I would really appreciate a bit of guidance.

I am writing in Ruby, if that makes a difference.

Background

I am writing a script that scrobbles my music library backlog to Last.FM. For anyone who isn't familiar with that service, that simply means that I am making lots of POST requests to a HTTP API.

I start with an array, each element is a hash/dictionary like {artist: "The Cure", track: "Siamese Twins"}. I make the calls by iterating over the array and issuing the simple method call lastfm.track.scrobble artist: song[:artist], track: song[:track].

Problem

Doing this in a straightforward one-at-a-time blocking style works perfectly, but is very very slow, because I'm waiting ~2 seconds for each HTTP request to travel the world and return. I could finish 5 times faster if I sent 5 HTTP requests simultaneously.

I asked on Stack Overflow what the best solution would be. Split the array into five parts, and give part a thread running one request at a time? Something like JavaScript's XMLHttpRequest which has an event loop and a callback function? They told me that this qusetion would be better suited to Programmers SE, and that I probably want the consumer-producer pattern. I looked it up, and it does sound like the kind of thing I need.

My Attempt in Ruby, based off a StackOverflow post

# "lastfm" is an object representing my Last.FM profile, 
# its .track.scrobble method handles a POST request and waits until it returns
# "songs" is an array of hashes like {artist: "The Cure", track: "One Hundred Years"}

queue = SizedQueue.new(10) # this will only allow 10 items on the queue at once

p1 = Thread.new do 
  songs.each do |song|
      scrobble = lastfm.track.scrobble artist: song[:artist], track: song[:track]
      puts "Scrobbled #{song[:track]} by #{song[:artist]}."
      queue << scrobble
  end
  queue << "done"
end

consumer = Thread.new do
  blocker = queue.pop(true) # don't block when zero items are in queue
  Thread.exit if blocker == "done"
  process(blocker)
end

# wait for the consumer to finish
consumer.join

My Error

Failure on the "pop" method call with error "queue empty".

I don't know enough about this stuff to really understand what's happening. It looks like one thread is filling a queue with API calls to make, never more than 10 at a time, while another thread is consuming from that queue and performing them. But why pop? Where is it ever actually executed? Why am I appending "done" to the queue?

I'm concerned about the line scrobble = lastfm.track.scrobble artist: song[:artist], track: song[:track]. Won't this call the lastfm.track.scrobble method outright, and store its return value as scrobble? Should I be using a proc or lambda instead, and calling that proc/lambda in the consumer?

Best Answer

The answer by scriptin is giving a reasonable solution, but doesn't answer your questions about the failure of the implementation you had.

  • You get a 'queue empty' error when you call pop on a Ruby queue in non-blocking mode (true passed) and there are no items on the queue.
  • Your "producer" thread is created and your "consumer" thread is created, but they'll run independently and start executing whenever Ruby feels like it.
  • If your "consumer" runs before the "producer" has pushed anything onto the queue, then obviously the "consumer" will pop an empty queue and throw an exception.

The solution to this one is easy; the consumer shouldn't use non-blocking mode. Don't pass true, just call pop(). The whole point of the consumer thread is that it can block on a queue waiting for the producer, without the rest of the application halting.

As for calling lastfm.track.scrobble - yes, this is executing that method immediately and pushing the result on the queue, which is exactly not what you want :-D and is in fact the reason why "reliably" your consumer thread gets to pop its queue before the producer ever has a chance to push anything (even if the producer thread runs first, it hits a slow 'scrobble' call and sits waiting for HTTP requests/responses to complete; so the consumer thread executes and tries to read from the empty queue).

You want to push the parameters to scrobble onto the queue, then have the consumer thread do the scrobbling. It's "nicer"/more structured to make little classes encapsulating the work and push instances of those onto your queue, but a cheap-and-nasty solution just uses the Hash which scrobble takes as input. We also need to remember to loop over the queue entries inside the consumer, instead of only processing one! Thus (UNTESTED) we might have:

queue = SizedQueue.new(10)

Thread.new do 
  songs.each do |song|
      queue << { artist: song[:artist], track: song[:track] }
  end
  queue << "done"
end

consumer = Thread.new do
  loop do
    scrobble_args = queue.pop()
    Thread.exit if scrobble_args == "done"
    lastfm.track.scrobble(scrobble_args)
    puts "Scrobbled #{scrobble_args[:track] by scrobble_args[:artist]}."
  end
end

consumer.join

This should technically work but is totally pointless, because that one lonely consumer thread is just processing the queue in serial. So let's simplify this. Really, you just want to dump all of 'songs' into the queue and have the consumer thread do the work. Since we want to run in parallel, we'll make an array of threads.

# I'm not bothering with the producer thread here so the queue size is unlimited.
# You could just as well put the 'songs.each' inside a producer thread with a
# a sized queue, as per your original code, if you wanted.
#
queue = Queue.new
songs.each do | song |
  queue << song
end

consumers = []

1.upto( 10 ) do
  consumers << Thread.new do

    # Keep looping until asked to exit
    #
    loop do
      song = queue.pop()
      Thread.exit if song == 'done'
      lastfm.track.scrobble( artist: song[ :artist ], track: song[ :track ] )
      puts "Scrobbled #{ song[ :track ] } by #{ song[ :artist ] }."
    end
  end

  # Push one 'done' exit instruction for each consumer
  #
  queue << 'done'
end

# Wait for all consumer threads to complete
#
consumers.each do | consumer |
  consumer.join()
end

...and that should do the trick. The "Parallel" gem will probably be doing much the same sort of thing under the hood.

Related Topic