I have a tool that generates a large number of files (ranging from hundreds of thousands to few millions) whenever it runs. All these files can be read independently of each other. I need to parse them and summarize the information.
Dummy example of generated files:
File1:
NAME=John AGE=25 ADDRESS=123 Fake St
NAME=Jane AGE=25 ADDRESS=234 Fake St
File2:
NAME=Dan AGE=30 ADDRESS=123 Fake St
NAME=Lisa AGE=30 ADDRESS=234 Fake St
Summary - counts how many times an address appeared across all files:
123 Fake St - 2
234 Fake St - 2
I want to use parallelization to read them, so multiprocessing
or asyncio
come to mind (I/O intensive operations). I plan to do the following operations in a single unit/function that will be called in parallel for each file:
- Open the file, go line by line
- Populate a unique dict containing the information provided by this file specifically
- Close the file
Once I am done reading all the files in parallel, and have one dict per file, I can now loop over each dict and summarize as needed.
The reason I think I need this two step process, is I can't have multiple parallel calls to that function directly summarize and write to a common summary dict. That will mess things up.
But that means I will consume a large amount of memory (due to holding those many hundreds of thousands to millions of dicts in memory).
What would be a good way to get the best of both worlds - runtime and memory consumption - to meet this objective?
CodePudding user response:
Based on the comments here's example using multiprocessing.Pool
.
Each process reads one file line by line and sends the result back to main process to collect.
import re
import multiprocessing
from collections import Counter
pat = re.compile(r"([A-Z]{2,})=(. ?)(?=[A-Z]{2,}=|$)")
def process_file(filename):
c = Counter()
with open(filename, "r") as f_in:
for line in f_in:
d = dict(pat.findall(line))
if "ADDRESS" in d:
c[d["ADDRESS"]] = 1
# send partial result back to main process:
return c
if __name__ == "__main__":
# you can get filelist for example from `glob` module:
files = ["file1.txt", "file2.txt"]
final_counter = Counter()
with multiprocessing.Pool(processes=4) as pool:
# iterate over files and update the final Counter:
for result in pool.imap_unordered(process_file, files):
final_counter.update(result)
# print final Counter:
for k, v in final_counter.items():
print("{:<20} {}".format(k, v))
Prints:
123 Fake St 2
234 Fake St 2
Note: You can use tqdm
module to get nice progress bar, for example:
...
from tqdm import tqdm
...
for result in pool.imap_unordered(process_file, tqdm(files)):
...
CodePudding user response:
Solution
Redis Queue RQ is likely the most appropriate datastore for this use-case. This will allow you to process the files in parallel while managing resources without needing to use multiprocessing—while you still could if you really needed to.
The benefit with this approach, in contrast with solely using multiprocessing
or async
is that it scales horizontally when used in conjunction with Redis Cluster.
Pseudocode
from redis import Redis
from rq import Queue, Retry
def do_work(f):
if maximum_filesize_in_queue_exceeds_resources():
raise ResourcesUnavailableError
else:
f.read()
...
def run(files_to_process):
q = Queue()
for f in files_to_process:
with open(filename, "r") as f:
q.enqueue(do_work, f, retry=Retry(max=3, interval=[10, 30, 60]))
if __name__ == "__main__":
files_to_process = ("foo", "bar", "baz")
run(files_to_process)
...
Results
You could use Redis to also store the results from each worker, and use rq's success hook to update the summary, which would also be stored in Redis. Once all queues have completed—you could then print a report with result and summary information stored in Redis.