Home > Software design >  How to add tqdm here?
How to add tqdm here?

Time:01-25

How do I add tqdm for the multiprocessing for loop here. Namely I want to wrap urls in tqdm():

jobs = []
urls = pd.read_csv(dataset, header=None).to_numpy().flatten()
for url in urls:
    job = pool.apply_async(worker, (url, q))
    jobs.append(job)

for job in jobs:
    job.get()

pool.close()
pool.join()

The suggested solution on GitHub is this:

pbar = tqdm(total=100)
def update(*a):
    pbar.update()
    # tqdm.write(str(a))
for i in range(pbar.total):
    pool.apply_async(myfunc, args=(i,), callback=update)
pool.close()
pool.join()

But my iterable is a list of URLs as opposed to a range like in the above. How do I translate the above solution to my for loop?

CodePudding user response:

You can use Parallel and Delayed from Joblib and use tqdm in the following manner:

from multiprocessing import cpu_count
from joblib import Parallel, delayed
def process_urls(urls,i):

   #define your function here 

Call function using:

urls = pd.read_csv(dataset, header=None).to_numpy().flatten()
Parallel(n_jobs=cpu_count(), prefer='processes')(delayed(process_urls)(urls, i) for i in tqdm(range(len(urls.axes[0]))))

CodePudding user response:

The easiest solution that is compatible with your current code is to just specify the callback argument to apply_async (and if there is a possibility of an exception in worker, then specify the error_callback argument too).

from multiprocessing import Pool
from tqdm import tqdm

def worker(url):
    # So that the progress par does not proceed to quickly
    # for demo purposes:
    import time
    time.sleep(1)

# For compatibility with platforms that use the *spawn* method (e.g. Windows):
if __name__ == '__main__':
    def my_callback(result):
        pbar.update()

    # for this demo:
    #urls = pd.read_csv(dataset, header=None).to_numpy().flatten()
    urls = list('abcdefghijklmnopqrstuvwxyz')
    with tqdm(total=len(urls)) as pbar:
        pool = Pool()
        jobs = [
            pool.apply_async(worker, (url,), callback=my_callback, error_callback=my_callback)
                for url in urls
        ]

        # You can delete the next two statements if you don't need
        # to save the value of jobs.get() since the calls to
        # pool.close() and pool.join() will wait for all submitted
        # tasks to complete:
        for job in jobs:
            job.get()

        pool.close()
        pool.join()

Or instead of using apply_async, use imap (or imap_unordered if you do not care either about the results or the order of the results):

from multiprocessing import Pool
from tqdm import tqdm

def worker(url):
    import time

    time.sleep(1) # so that the progress par does not proceed to quickly:
    return url


# For compatibility with platforms that use the *spawn* method (e.g. Windows):
if __name__ == '__main__':
    # for this demo:
    #urls = pd.read_csv(dataset, header=None).to_numpy().flatten()
    urls = list('abcdefghijklmnopqrstuvwxyz')
    pool = Pool()
    results = list(tqdm(pool.imap(worker, urls), total=len(urls)))
    print(results)
    pool.close()
    pool.join()
  • Related