Home > Net >  Parsing large number of files in parallel and summarizing
Parsing large number of files in parallel and summarizing

Time:08-03

I have a tool that generates a large number of files (ranging from hundreds of thousands to few millions) whenever it runs. All these files can be read independently of each other. I need to parse them and summarize the information.

Dummy example of generated files:

File1:
NAME=John AGE=25 ADDRESS=123 Fake St
NAME=Jane AGE=25 ADDRESS=234 Fake St

File2:
NAME=Dan AGE=30 ADDRESS=123 Fake St
NAME=Lisa AGE=30 ADDRESS=234 Fake St

Summary - counts how many times an address appeared across all files:

123 Fake St - 2
234 Fake St - 2

I want to use parallelization to read them, so multiprocessing or asyncio come to mind (I/O intensive operations). I plan to do the following operations in a single unit/function that will be called in parallel for each file:

  1. Open the file, go line by line
  2. Populate a unique dict containing the information provided by this file specifically
  3. Close the file

Once I am done reading all the files in parallel, and have one dict per file, I can now loop over each dict and summarize as needed.

The reason I think I need this two step process, is I can't have multiple parallel calls to that function directly summarize and write to a common summary dict. That will mess things up.

But that means I will consume a large amount of memory (due to holding those many hundreds of thousands to millions of dicts in memory).

What would be a good way to get the best of both worlds - runtime and memory consumption - to meet this objective?

CodePudding user response:

Based on the comments here's example using multiprocessing.Pool.

Each process reads one file line by line and sends the result back to main process to collect.

import re
import multiprocessing
from collections import Counter

pat = re.compile(r"([A-Z]{2,})=(. ?)(?=[A-Z]{2,}=|$)")


def process_file(filename):
    c = Counter()
    with open(filename, "r") as f_in:
        for line in f_in:
            d = dict(pat.findall(line))
            if "ADDRESS" in d:
                c[d["ADDRESS"]]  = 1

    # send partial result back to main process:
    return c


if __name__ == "__main__":

    # you can get filelist for example from `glob` module:
    files = ["file1.txt", "file2.txt"]

    final_counter = Counter()

    with multiprocessing.Pool(processes=4) as pool:
        # iterate over files and update the final Counter:
        for result in pool.imap_unordered(process_file, files):
            final_counter.update(result)

    # print final Counter:
    for k, v in final_counter.items():
        print("{:<20} {}".format(k, v))

Prints:

123 Fake St          2
234 Fake St          2

Note: You can use tqdm module to get nice progress bar, for example:

...
from tqdm import tqdm
...

for result in pool.imap_unordered(process_file, tqdm(files)):

...

CodePudding user response:

Solution

Redis Queue RQ is likely the most appropriate datastore for this use-case. This will allow you to process the files in parallel while managing resources without needing to use multiprocessing—while you still could if you really needed to.

The benefit with this approach, in contrast with solely using multiprocessing or async is that it scales horizontally when used in conjunction with Redis Cluster.

Pseudocode

from redis import Redis
from rq import Queue, Retry

def do_work(f):
    if maximum_filesize_in_queue_exceeds_resources():
      raise ResourcesUnavailableError
    else:
        f.read()
        ...

def run(files_to_process):
    q = Queue()

    for f in files_to_process:
        with open(filename, "r") as f:
            q.enqueue(do_work, f, retry=Retry(max=3, interval=[10, 30, 60]))

if __name__ == "__main__":
    files_to_process = ("foo", "bar", "baz")

    run(files_to_process)
    ...

Results

You could use Redis to also store the results from each worker, and use rq's success hook to update the summary, which would also be stored in Redis. Once all queues have completed—you could then print a report with result and summary information stored in Redis.

References

  1. https://redis.com

  2. https://python-rq.org

  3. https://redis.io/docs/manual/scaling/

  • Related