I'm trying to understand a little bit of what's going on behind the scenes when using the apply_sync method of a multiprocessing pool.
Who runs the callback method? Is it the main process that called apply_async?
Let's say I send out a whole bunch of apply_async commands with callbacks and then continue with my program. My program is still doing things when the apply_async's start to finish. How does the callback get run my the "main process" while the main process is still busy with the script?
Here's an example.
import multiprocessing
import time
def callback(x):
print '{} running callback with arg {}'.format(multiprocessing.current_process().name, x)
def func(x):
print '{} running func with arg {}'.format(multiprocessing.current_process().name, x)
return x
pool = multiprocessing.Pool()
args = range(20)
for a in args:
pool.apply_async(func, (a,), callback=callback)
print '{} going to sleep for a minute'.format(multiprocessing.current_process().name)
t0 = time.time()
while time.time() - t0 < 60:
pass
print 'Finished with the script'
The output is something like
PoolWorker-1 running func with arg 0
PoolWorker-2 running func with arg 1
PoolWorker-3 running func with arg 2
MainProcess going to sleep for a minute <– main process is busy
PoolWorker-4 running func with arg 3
PoolWorker-1 running func with arg 4
PoolWorker-2 running func with arg 5
PoolWorker-3 running func with arg 6
PoolWorker-4 running func with arg 7
MainProcess running callback with arg 0 <– main process running callback while it's still in the while loop!!
MainProcess running callback with arg 1
MainProcess running callback with arg 2
MainProcess running callback with arg 3
MainProcess running callback with arg 4
PoolWorker-1 running func with arg 8
…
Finished with script
How is MainProcess running the callback while it's in the middle of that while loop??
There is this statement about the callback in the documentation for multiprocessing.Pool that seems like a hint but I don't understand it.
apply_async(func[, args[, kwds[, callback]]])
A variant of the apply() method which returns a result object.
If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked.
Best Answer
There is indeed a hint in the docs:
The callbacks are handled in the main process, but they're run in their own separate thread. When you create a
Pool
it actually creates a fewThread
objects internally:The interesting thread for us is
_result_handler
; we'll get to why shortly.Switching gears for a second, when you run
apply_async
, it creates anApplyResult
object internally to manage getting the result from the child:As you can see, the
_set
method is the one that ends up actually executing thecallback
passed in, assuming the task was successful. Also notice that it adds itself to a globalcache
dict at the end of__init__
.Now, back to the
_result_handler
thread object. That object calls the_handle_results
function, which looks like this:It's a loop that just pulls results from children out of queue, finds the entry for it in
cache
, and calls_set
, which executes our callback. It's able to run even though you're in a loop because it isn't running in the main thread.