Home > Software engineering >  Optimal way to use multiprocessing for many files
Optimal way to use multiprocessing for many files

Time:05-17

So I have a large list of files that need to be processed into CSVs. Each file itself is quite large, and each line is a string. Each line of the files could represent one of three types of data, each of which is processed a bit differently. My current solution looks like the following:

    type1_columns = [...]
    type2_columns = [...]
    type3_columns = [...]

    file_list = os.listdir(filelist)

    def process_type1_line(json_line):
       #processing logic
       to_append = [a, b, c, d, e]
       type1_series = pd.Series(to_append, index=type1_columns)
       return type1_series


    def process_type2_line(json_line):
       #processing logic
       to_append = [a, b, c, d, e]
       type2_series = pd.Series(to_append, index=type2_columns)
       return type2_series


    def process_type3_line(json_line):
       #processing logic
       to_append = [a, b, c, d, e]
       type3_series = pd.Series(to_append, index=type3_columns)
       return type3_series


    def process_file(file):
        type1_df = pd.DataFrame(columns=type1_columns)
        type2_df = pd.DataFrame(columns=type2_columns)
        type3_df = pd.DataFrame(columns=type3_columns)

        with open(filepath/file) as f:
             data=f.readlines()
             for line in data:
                  #some logic to get the record_type and convert line to json
                  record_type = ...
                  json_line = ...

                  if record_type == "type1":
                       type1_series = process_type1_line(json_line)
                       type1_df = type1_df.append(type1_series, ignore_index=True)
                  if record_type == "type2":
                       type2_series = process_type2_line(json_line)
                       type2_df = type2_df.append(type2_series, ignore_index=True)
                  if record_type == "type3":
                       type3_series = process_type3_line(json_line)
                       type3_df = type3_df.append(type3_series, ignore_index=True)

        type1_df.to_csv(type1_csv_path.csv)
        type2_df.to_csv(type2_csv_path.csv)
        type3_df.to_csv(type3_csv_path.csv)


     for file in file_list:
          process_file(file)

I iterate through the files, and create dataframes for each of the three different types of records. I parse through the lines and call the appropriate processing function for each. The returned series is appended to the final dataframe for that record_type for that file. Once the file is processed, the three dataframes are saved as CSVs and we begin with the next file.

The issues is that this approach takes far too long, it would take weeks for me to process all of the files.

I tried to modify my approach by using multiprocessing (which I don't have a ton of experience with) with the following:

     with ThreadPoolExecutor(max_workers=30) as executor:
          futures = [executor.submit(process_file, file) for file in file_list]

In some logging print statements I can see that this started the processing for 30 files but none have completed so I at least know that my approach is flawed. Could anyone explain what the best approach to this problem would be? Perhaps some combination of multiprocessing and asyncio?

CodePudding user response:

You've got two big problems:

  1. You're loading the whole input file into memory, producing the entire result in memory, then writing the whole output file at once. This means that, if you have 30 workers operating in parallel, you need memory proportionate to 30 of your (self-described) large files. You store all the data twice as well, once as the list of str lines returned by f.readlines(), then again in one of the three DataFrames; if you used your code, without executors, as is, and just changed:

          data=f.readlines()
          for line in data:
    

    to:

          for line in f:
    

    you'd immediately reduce your memory usage by roughly half, which (might) be enough to stop page thrashing. That said, you'd still be using memory proportionate to file size to store the DataFrames, so you'd resume thrashing if you parallelized your code, and might still thrash even without parallelism if the files are large enough.

  2. You're using .append for every line, which, IIRC, for DataFrames is a form of Schlemiel the Painter's algorithm: Each append makes a brand-new DataFrame, copying the entire contents of the old DataFrame plus the tiny amount of new data into a new DataFrame, with the work taking longer and longer as the existing data gets bigger; what should be amortized O(n) work becomes O(n**2) work.

Between the two of those, you're using way more memory than needed, and performing a ton of unnecessary busywork on the repeated appends. The parallelism might help do the busywork faster, but in exchange, it increases your memory requirements by a factor of 30x; odds are, you don't have that much RAM (if these files are truly large, you may not have enough RAM for even one of the files), and you end up page thrashing (writing memory out to the pagefile/swap file to make room for other stuff, reading it back in on demand, and frequently dropping memory that's paged in before you're through with it, making memory access tied to disk performance, which is a few orders of magnitude slower than RAM access).

I don't know Pandas well enough to say if it offers some better, incremental solution for what you're doing, but you don't really need one; just work with the input line by line, and use the csv module to write rows as you go. Your memory requirements will drop from "proportionate to the size of each input file" to "proportionate to the data from each line of the input file".

Your process_file function would end up looking something like:

def process_file(file):
    # Open input file and all output files (newline='' needed to play nice with csv module
    # which takes control of newline format to ensure dialect rules followed precisely,
    # regardless of OS line separator rules)
    with open(filepath/file) as f,\
         open(type1_csv_path, 'w', newline='') as type1f,\
         open(type2_csv_path, 'w', newline='') as type2f,\
         open(type3_csv_path, 'w', newline='') as type3f:
         csv1 = csv.writer(type1f)
         csv1.writerow(type1_columns)  # Omit if no useful column header
         csv2 = csv.writer(type2f)
         csv2.writerow(type2_columns)  # Omit if no useful column header
         csv3 = csv.writer(type3f)
         csv3.writerow(type3_columns)  # Omit if no useful column header
         for line in f:  # Directly iterating file object lazily fetches line at a time
                         # where .readlines() eagerly fetches whole file, requiring
                         # a ton of memory for no reason
              #some logic to get the record_type and convert line to json
              record_type = ...
              json_line = ...

              if record_type == "type1":
                   type1_series = process_type1_line(json_line)
                   csv1.writerow(type1_series)  # Might need to convert to plain list if Series
                                                # doesn't iterate the values you need directly
              elif record_type == "type2":
                   type2_series = process_type2_line(json_line)
                   csv2.writerow(type2_series)
              elif record_type == "type3":
                   type3_series = process_type3_line(json_line)
                   csv3.writerow(type3_series)

If this works as is (without executors), use it that way. If you were page thrashing even without executors, or the files were large enough that the repeated appends hurt you badly, this might be enough to make it work all by itself. If it's too slow, executors might provide a little benefit if you're doing a lot of work to process each line into the output format (because while most workers are processing, one or two workers can adequately share disk access for reads and writes), but if the processing work per line is low, anything over a small handful of workers (I'd start with just two or three) would just increase disk contention (especially if you're using a spinning disk hard drive, not an SSD), and parallelism either won't help, or will actively harm.

You may need to tweak the exact CSV dialect used (passed as arguments to csv.writer), and possibly explicitly set a specific encoding for the output files rather than the locale default (e.g. passing encoding='utf-8' or encoding='utf-16' to the opens for write, so it's always writing in the encoding the consumer(s) of the .csv files expects), but that's the general form to go for.

  • Related