Home > Back-end >  Python3: Limit Queue produced by Thread
Python3: Limit Queue produced by Thread

Time:01-27

I'm running both I/O Bound and CPU Bound task. Where multiprocessing is used for crawling website from multiprocessing.Queue(), which parse it's HTML to extract links, and threading is used to read a text file that contains list of subdomain from giant marketplace sites (shopee.com and tokopedia.com)

import multiprocessing
from threading import Thread
from multiprocessing import cpu_count, Process, Queue

tasker = Queue(cpu_count()*10)

def consumer():
    while True:
        task = tasker.get()
        if task is None:
            break
        print(task)

def adding_task():
    with open('file.txt', 'r') as f:
        for line in f:
            tasker.put(line.strip())
    for i in range(cpu_count()):
        tasker.put(None)

def producer():
    add_task = Thread(target = adding_task)
    add_task.start()
    procs = []
    for i in range(cpu_count()):
        p = Process(target = consumer)
        p.start()
        procs.append(p)
    for p in procs:
        p.join()
    
    #Suspected problem.
    add_task.join()

producer()

The problems is: Queue is growing faster than multiprocessing could finish the task. Currently, I'm using this to check if the task is empty:

import multiprocessing
from threading import Thread
from netaddr import IPNetwork
from multiprocessing import cpu_count, Process, Queue

tasker = Queue(cpu_count()*10)
ips = '173.245.48.0/20'

def consumer():
    while True:
        task = tasker.get()
        if task is None:
            break
        print(task)

#Check if Queue is empty or full: tasker.full()
def check_tasker():
    while True:
        if tasker.empty():
            break
        pass

def adding_task():
    for ip in IPNetwork(ips):
        check_tasker()
        tasker.put(str(ip))
    for i in range(cpu_count()):
        tasker.put(None)

def producer():
    add_task = Thread(target = adding_task)
    add_task.start()
    procs = []
    for i in range(cpu_count()):
        p = Process(target = consumer)
        p.start()
        procs.append(p)
    for p in procs:
        p.join()
    
    add_task.join()
    exit()

producer()

Is there a better way to temporarily stop threading if multiprocessing.Queue has reach N amount of task? and continue if it's decreased?

CodePudding user response:

The queue is already bounded:

tasker = Queue(cpu_count()*10)

That's the only limit you need. The capacity bound already stops new tasks from being added if the queue is at max capacity. (OS-level limits on the size of the underlying pipe may prevent adding items even before the queue reaches nominal capacity.)

  • Related