I have a task, which I aim to parallelize with the help of the joblib
-library. The function is fairly slow when ran sequentially, therefore I tried using parallelization paradigms to speed up the process.
with Parallel(n_jobs = -1,verbose = 100) as parallel:
test = parallel(delayed(create_time_series_capacity_v4)(block_info.UnitID[i]) for i in block_info.UnitID.unique())
out_data = pd.concat([out_data,test[test.columns[1]]],axis=1 )
The block unique has approximately 1000 entries and the creation of the timeseries, takes longer for some units compared to others. Which leaves me to think, that some workers are left working while others are performing an intensive task. Is there a way for to reuse the available processes rather than leaving them idling ? I have pasted below what the code returns while being executed:
UNIT05-001 has been written
UNIT04-001 has been written
UNIT05-003 has been written
[Parallel(n_jobs=-1)]: Done 1 tasks | elapsed: 0.2s
[Parallel(n_jobs=-1)]: Done 2 out of 10 | elapsed: 0.2s remaining: 1.2s
[Parallel(n_jobs=-1)]: Done 3 out of 10 | elapsed: 0.2s remaining: 0.7s
UNIT05-004 has been written
[Parallel(n_jobs=-1)]: Done 4 out of 10 | elapsed: 0.4s remaining: 0.7s
UNIT05-002 has been written
[Parallel(n_jobs=-1)]: Done 5 out of 10 | elapsed: 0.6s remaining: 0.6s
UNIT02-001 has been written
[Parallel(n_jobs=-1)]: Done 6 out of 10 | elapsed: 27.9s remaining: 18.5s
UNIT01-001 has been written
[Parallel(n_jobs=-1)]: Done 7 out of 10 | elapsed: 50.4s remaining: 21.5s
CodePudding user response:
I am not that familiar with joblib
but I quickly perused the documentation. It appears that you are using the default "multiprocessing" backend that is based on Pythons multiprocessing.Pool
implementation, of which I do know a bit. This class creates a pool of processes as you would expect. Your 1000 tasks are placed on a "task queue" (see Chunking below). Each process in the pool is initially idle so they each remove a task from the queue and execute their respective tasks. When a process has completed executing the task, it becomes idle again and so it goes back to retrieve the next task on the queue. This continues until there are no more tasks on the queue at which point all the processors remain idle until another task is added.
What we cannot assume in general is that every task takes an equal amount of time to run. But for the sake of argument let's assume that all tasks take an equal amount of time to execute. If you submit 1000 tasks to be handled by 16 processors, then after each process has executed 62 tasks (16 * 62 = 992) there will be only 8 tasks left on the task queue to be executed. In this case 8 processes will remain idle while the other 8 processes execute the final 8 tasks. But unless these tasks are very long running, you would see all 16 processes going idle and remaining that way more or less at the same time. Now let us assume that all tasks take the same amount of time except the very last task submitted, which takes 15 minutes longer to execute. Now you would expect to see 15 processes going idle more or less at the same time with the 16th processor taking an extra 15 minutes before it goes idle. But if this extra long-running task were the first task submitted, again you would expect to see all the processors going and staying idle at the same time. Of course, the process that executed the very long-running task will end up processing fewer tasks than the other processes under the assumption that the other tasks take far less time to complete.
Chunking
The multiprocessing.Pool
suports chunking; whether or not joblib
uses this capaility or not I cannot determine. But this is the way it works:
Since reading and writing to the task queue can be rather expensive, to reduce the number of operations to the task queue the pool can batch the submitted tasks into chunks of a certain size. That is, instead of writing 1000 tasks to the queue one at a time, the pool might write the tasks in chunks of 16 tasks as an example of a possible chunk size. Thus, when a processor becomes idle, it gets from the task queue the next chunk containing 16 tasks. The process will not become idle until all 16 tasks have been executed and only then will the process try to get the next chunk of tasks.
So, if a chunk size of 16 were being used, there would be 62 chunks of size 16 (16 * 62 = 992 tasks) placed on the queue plus a final chunk of 8 tasks for a total of 63 chunks. After each of the 16 processes has executed 3 chunks (16 * 3 = 48 chunks), there would be 15 chunks left on the queue. So one of the processes would go idle immediately. Of the 15 processors left processing the 15 chunks, remember that one of the chunks only contains 8 tasks rather than 16. This process will go idle before the other 14, each of which will still have 8 more tasks to execute in its final chunk.
Conclusion
The above examples based on tasks that all take the same time to run is not very realistic but should still give you an idea of how things work.