I'm running both I/O Bound and CPU Bound task. Where multiprocessing
is used for crawling website from multiprocessing.Queue()
, which parse it's HTML to extract links, and threading
is used to read a text file that contains list of subdomain from giant marketplace sites (shopee.com and tokopedia.com)
import multiprocessing
from threading import Thread
from multiprocessing import cpu_count, Process, Queue
tasker = Queue(cpu_count()*10)
def consumer():
while True:
task = tasker.get()
if task is None:
break
print(task)
def adding_task():
with open('file.txt', 'r') as f:
for line in f:
tasker.put(line.strip())
for i in range(cpu_count()):
tasker.put(None)
def producer():
add_task = Thread(target = adding_task)
add_task.start()
procs = []
for i in range(cpu_count()):
p = Process(target = consumer)
p.start()
procs.append(p)
for p in procs:
p.join()
#Suspected problem.
add_task.join()
producer()
The problems is: Queue
is growing faster than multiprocessing
could finish the task. Currently, I'm using this to check if the task is empty:
import multiprocessing
from threading import Thread
from netaddr import IPNetwork
from multiprocessing import cpu_count, Process, Queue
tasker = Queue(cpu_count()*10)
ips = '173.245.48.0/20'
def consumer():
while True:
task = tasker.get()
if task is None:
break
print(task)
#Check if Queue is empty or full: tasker.full()
def check_tasker():
while True:
if tasker.empty():
break
pass
def adding_task():
for ip in IPNetwork(ips):
check_tasker()
tasker.put(str(ip))
for i in range(cpu_count()):
tasker.put(None)
def producer():
add_task = Thread(target = adding_task)
add_task.start()
procs = []
for i in range(cpu_count()):
p = Process(target = consumer)
p.start()
procs.append(p)
for p in procs:
p.join()
add_task.join()
exit()
producer()
Is there a better way to temporarily stop threading
if multiprocessing.Queue
has reach N amount of task? and continue if it's decreased?
CodePudding user response:
The queue is already bounded:
tasker = Queue(cpu_count()*10)
That's the only limit you need. The capacity bound already stops new tasks from being added if the queue is at max capacity. (OS-level limits on the size of the underlying pipe may prevent adding items even before the queue reaches nominal capacity.)