I'm a bit into a dilemma on 'how to process giant lines of file with Multiprocessing'. In the plan, I will send a requests
for each line. The file contains a giant list of Subdomain consist of 1-10 Million line, altho the file size is under 1 GB but I worried that my Implementation are going to exhaust my Memory.
I'm using Queue()
to distribute the task to each process. Here's what I do:
from multiprocessing import Queue
tasker = Queue()
with open(processor, 'r') as f:
for line in f:
tasker.put(line.strip())
The Multiprocessing
will eventually slow-down, so it must be because of Queue()
are growing faster than the process could complete. As for Alternative, I use itertools.islice()
function that will produce 4 task and then Immediately consume it.
from itertools import islice
from multiprocessing import Queue
tasker = Queue()
with open('file.txt', 'r') as f:
for line in f:
liner = [line] list(islice(f, 4))
for i in liner:
tasker.put(str(re.sub('\n', '', i.strip())))
Finally, the Multiprocessing
are running in a reasonable time without Slowing Down but it creates a list containing 4 item that will be appended into the Queue()
until the list Empty. So it's kinda putting items from 1 list into another list, weird, I know.
Is there a better way to handle this without stressing too much Memory?
CodePudding user response:
the two code snippets are equivalent in the produced output, the second one is only slower, and is only fixing the problem by "stalling the CPU".
the correct way to limit the memory usage is to limit the queue itself, the queue accepts its max size as an argument.
from multiprocessing import Queue
max_size = 1000
tasker = Queue(max_size)
if the main process tries to put more tasks in it then it will block the main process until the workers finish some of the work, this will be fast and efficient in terms of CPU and memory usage, (the main process will allow workers to use its CPU core to finish some of the work, hence more works gets done, without consuming more memory)
if you don't want the main process to be blocked as it has other things to do then you can have a thread in the main process putting tasks in the queue using the threading module
if you want to reduce the overhead of locking the queue, then you should submit work in chunks.
from itertools import islice
from multiprocessing import Queue
max_size = 200 # note: 200 * 5 = 1000
tasker = Queue(max_size)
with open('file.txt', 'r') as f:
for line in f:
liner = [line] list(islice(f, 4))
liner = [x.strip() for x in liner]
tasker.put(liner) # put the list of 5 items in the queue
and the consumer is expected to loop over the list obtained from the queue and perform each task in the received list independently.