Home > Software engineering >  Python Multiprocessing write to csv data for huge volume files
Python Multiprocessing write to csv data for huge volume files

Time:09-24

I am trying to do calculation and write it to another txt file using multiprocessing program. I am getting count mismatch in output txt file. every time execute I am getting different output count.

I am new to python could some one please help.

import pandas as pd
import multiprocessing as mp

source = "\\share\usr\data.txt"
target = "\\share\usr\data_masked.txt"

Chunk = 10000

def process_calc(df):
    ''' 
        get source df do calc and return newdf
        ...
    '''
 return(newdf)        
  
def calc_frame(df):
    output_df = process_calc(df)
    output_df.to_csv(target,index=None,sep='|',mode='a',header=False)

if __name__ == '__main__':
    reader= pd.read_table(source,sep='|',chunksize = chunk,encoding='ANSI')
    pool = mp.Pool(mp.cpu_count())
    jobs = []
    
    for each_df in reader:
        process = mp.Process(target=calc_frame,args=(each_df)
        jobs.append(process)
        process.start()
    
    for j in jobs:
        j.join()

CodePudding user response:

You have several issues in your source as posted that would prevent it from even compiling let alone running. I have attempted to correct those in an effort to also solving your main problem. But do check the code below thoroughly just to make sure the corrections make sense.

First, the args argument to the Process constructor should be specified as a tuple. You have specified args=(each_df), but (each_df) is not a tuple, it is a simple parenthesized expression; you need (each_df,) to make if a tuple (the statement is also missing a closing parentheses).

The problem you have in addition to making no provision against multiple processes simultaneously attempting to append to the same file is that you cannot be assured of the order in which the processes complete and thus you have no real control over the order in which the dataframes will be appended to the csv file.

The solution is to use a processing pool with the imap method. The iterable to pass to this method is just the reader, which when iterated returns the next dataframe to process. The return value from imap is an iterable that when iterated will return the next return value from calc_frame in task-submission order, i.e. the same order that the dataframes were submitted. So as these new, modified dataframes are returned, the main process can simply append these to the output file one by one:

import pandas as pd
import multiprocessing as mp

source = r"\\share\usr\data.txt"
target = r"\\share\usr\data_masked.txt"

Chunk = 10000

def process_calc(df):
    ''' 
        get source df do calc and return newdf
        ...
    '''
    return(newdf)

def calc_frame(df):
    output_df = process_calc(df)
    return output_df

if __name__ == '__main__':
    with mp.Pool() as pool:
        reader = pd.read_table(source, sep='|', chunksize=Chunk, encoding='ANSI')
        for output_df in pool.imap(process_calc, reader):
            output_df.to_csv(target, index=None, sep='|', mode='a', header=False)
  • Related