Here's a timed example of multiple image arrays of different sizes being saved in a loop as well as concurrently using threads / processes:
import tempfile
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from pathlib import Path
from time import perf_counter
import numpy as np
from cv2 import cv2
def save_img(idx, image, dst):
cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)
if __name__ == '__main__':
l1 = np.random.randint(0, 255, (100, 50, 50, 1))
l2 = np.random.randint(0, 255, (1000, 50, 50, 1))
l3 = np.random.randint(0, 255, (10000, 50, 50, 1))
temp_dir = tempfile.mkdtemp()
workers = 4
t1 = perf_counter()
for ll in l1, l2, l3:
t = perf_counter()
for i, img in enumerate(ll):
save_img(i, img, temp_dir)
print(f'Time for {len(ll)}: {perf_counter() - t} seconds')
for executor in ThreadPoolExecutor, ProcessPoolExecutor:
with executor(workers) as ex:
futures = [
ex.submit(save_img, i, img, temp_dir) for (i, img) in enumerate(ll)
]
for f in as_completed(futures):
f.result()
print(
f'Time for {len(ll)} ({executor.__name__}): {perf_counter() - t} seconds'
)
And I get these durations on my i5 mbp:
Time for 100: 0.09495482999999982 seconds
Time for 100 (ThreadPoolExecutor): 0.14151873999999998 seconds
Time for 100 (ProcessPoolExecutor): 1.5136184309999998 seconds
Time for 1000: 0.36972280300000016 seconds
Time for 1000 (ThreadPoolExecutor): 0.619205703 seconds
Time for 1000 (ProcessPoolExecutor): 2.016624468 seconds
Time for 10000: 4.232915643999999 seconds
Time for 10000 (ThreadPoolExecutor): 7.251599262 seconds
Time for 10000 (ProcessPoolExecutor): 13.963426469999998 seconds
Aren't threads / processes expected to require less time to achieve the same thing? and why not in this case?
CodePudding user response:
The timings in the code are wrong because the timer t
is not reset before testing the Pools. Nevertheless, the relative order of the timings are correct. A possible code with a timer reset is:
import tempfile
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from pathlib import Path
from time import perf_counter
import numpy as np
from cv2 import cv2
def save_img(idx, image, dst):
cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)
if __name__ == '__main__':
l1 = np.random.randint(0, 255, (100, 50, 50, 1))
l2 = np.random.randint(0, 255, (1000, 50, 50, 1))
l3 = np.random.randint(0, 255, (10000, 50, 50, 1))
temp_dir = tempfile.mkdtemp()
workers = 4
for ll in l1, l2, l3:
t = perf_counter()
for i, img in enumerate(ll):
save_img(i, img, temp_dir)
print(f'Time for {len(ll)}: {perf_counter() - t} seconds')
for executor in ThreadPoolExecutor, ProcessPoolExecutor:
t = perf_counter()
with executor(workers) as ex:
futures = [
ex.submit(save_img, i, img, temp_dir) for (i, img) in enumerate(ll)
]
for f in as_completed(futures):
f.result()
print(
f'Time for {len(ll)} ({executor.__name__}): {perf_counter() - t} seconds'
)
Multithreading is faster specially for I/O bound processes. In this case, compressing the images is cpu-intensive, so depending on the implementation of OpenCV and of the python wrapper, multithreading can be much slower. In many cases the culprit is CPython's GIL, but I am not sure if this is the case (I do not know if the GIL is released during the imwrite
call). In my setup (i7 8th gen), Threading is as fast as the loop for 100 images and barely faster for 1000 and 10000 images. If ThreadPoolExecutor
reuses threads, there is an overhead involved in assigning a new task to an existing thread. If it does not reuses threads, there is an overhead involved in launching a new thread.
Multiprocessing circumvents the GIL issue, but has some other problems. First, pickling the data to pass between processes takes some time, and in the case of images it can be very expensive. Second, in the case of windows, spawning a new process takes a lot of time. A simple test to see the overhead (both for processes and threads) is to change the save_image
function by one that does nothing, but still need pickling, etc:
def save_img(idx, image, dst):
if idx != idx:
print("impossible!")
and by a similar one without parameters to see the overhead of spawning the processes, etc.
The timings in my setup show that 2.3 seconds are needed just to spawn the 10000 processes and 0.6 extra seconds for pickling, which is much more than the time needed for processing.
A way to improve the throughput and keep the overhead to a minimum is to break the work on chunks, and submit each chunk to the worker:
import tempfile
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from pathlib import Path
from time import perf_counter
import numpy as np
from cv2 import cv2
def save_img(idx, image, dst):
cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)
def multi_save_img(idx_start, images, dst):
for idx, image in zip(range(idx_start, idx_start len(images)), images):
cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)
if __name__ == '__main__':
l1 = np.random.randint(0, 255, (100, 50, 50, 1))
l2 = np.random.randint(0, 255, (1000, 50, 50, 1))
l3 = np.random.randint(0, 255, (10000, 50, 50, 1))
temp_dir = tempfile.mkdtemp()
workers = 4
for ll in l1, l2, l3:
t = perf_counter()
for i, img in enumerate(ll):
save_img(i, img, temp_dir)
print(f'Time for {len(ll)}: {perf_counter() - t} seconds')
chunk_size = len(ll)//workers
ends = [chunk_size * (_ 1) for _ in range(workers)]
ends[-1] = len(ll) % workers
starts = [chunk_size * _ for _ in range(workers)]
for executor in ThreadPoolExecutor, ProcessPoolExecutor:
t = perf_counter()
with executor(workers) as ex:
futures = [
ex.submit(multi_save_img, start, ll[start:end], temp_dir) for (start, end) in zip(starts, ends)
]
for f in as_completed(futures):
f.result()
print(
f'Time for {len(ll)} ({executor.__name__}): {perf_counter() - t} seconds'
)
This should give you a significant boost over a simple for, both for a multiprocessing and multithreading approach.