How do I add tqdm
for the multiprocessing for loop here. Namely I want to wrap urls
in tqdm()
:
jobs = []
urls = pd.read_csv(dataset, header=None).to_numpy().flatten()
for url in urls:
job = pool.apply_async(worker, (url, q))
jobs.append(job)
for job in jobs:
job.get()
pool.close()
pool.join()
The suggested solution on GitHub is this:
pbar = tqdm(total=100)
def update(*a):
pbar.update()
# tqdm.write(str(a))
for i in range(pbar.total):
pool.apply_async(myfunc, args=(i,), callback=update)
pool.close()
pool.join()
But my iterable is a list of URLs as opposed to a range like in the above. How do I translate the above solution to my for loop?
CodePudding user response:
You can use Parallel
and Delayed
from Joblib
and use tqdm in the following manner:
from multiprocessing import cpu_count
from joblib import Parallel, delayed
def process_urls(urls,i):
#define your function here
Call function using:
urls = pd.read_csv(dataset, header=None).to_numpy().flatten()
Parallel(n_jobs=cpu_count(), prefer='processes')(delayed(process_urls)(urls, i) for i in tqdm(range(len(urls.axes[0]))))
CodePudding user response:
The easiest solution that is compatible with your current code is to just specify the callback argument to apply_async
(and if there is a possibility of an exception in worker
, then specify the error_callback argument too).
from multiprocessing import Pool
from tqdm import tqdm
def worker(url):
# So that the progress par does not proceed to quickly
# for demo purposes:
import time
time.sleep(1)
# For compatibility with platforms that use the *spawn* method (e.g. Windows):
if __name__ == '__main__':
def my_callback(result):
pbar.update()
# for this demo:
#urls = pd.read_csv(dataset, header=None).to_numpy().flatten()
urls = list('abcdefghijklmnopqrstuvwxyz')
with tqdm(total=len(urls)) as pbar:
pool = Pool()
jobs = [
pool.apply_async(worker, (url,), callback=my_callback, error_callback=my_callback)
for url in urls
]
# You can delete the next two statements if you don't need
# to save the value of jobs.get() since the calls to
# pool.close() and pool.join() will wait for all submitted
# tasks to complete:
for job in jobs:
job.get()
pool.close()
pool.join()
Or instead of using apply_async
, use imap
(or imap_unordered if you do not care either about the results or the order of the results):
from multiprocessing import Pool
from tqdm import tqdm
def worker(url):
import time
time.sleep(1) # so that the progress par does not proceed to quickly:
return url
# For compatibility with platforms that use the *spawn* method (e.g. Windows):
if __name__ == '__main__':
# for this demo:
#urls = pd.read_csv(dataset, header=None).to_numpy().flatten()
urls = list('abcdefghijklmnopqrstuvwxyz')
pool = Pool()
results = list(tqdm(pool.imap(worker, urls), total=len(urls)))
print(results)
pool.close()
pool.join()