Home > other >  Mult-thread an Operation with loop througt the csv file in Python
Mult-thread an Operation with loop througt the csv file in Python

Time:10-07

I have a CSV file parser script in Python to do some stuff with a big CSV file. There is around 1 mil. rows, so the process takes some time.

import csv
import sys

with open('csvfeed.csv', newline='', encoding='utf-8') as csvfile:
    reader = csv.reader(csvfile, delimiter=';', quotechar='|')
    for row in reader:
        ParserFunction(row)

def ParserFunction(row):
    #Some logic with row

Is there a way to multi-thread this loop function, to lower the execution time?

Thanks

CodePudding user response:

You can divide each row to be processed with a single thread instead of the main thread waiting for the previous row to finish processing to proceed with the next row:

import csv
import sys
import threading
def ParserFunction(row):
    #Some logic with row
    pass

with open('csvfeed.csv', newline='', encoding='utf-8') as csvfile:
    reader = csv.reader(csvfile, delimiter=';', quotechar='|')
    for row in reader:
        threading.start_new_thread(ParserFunction, row)
    

But the exact way of doing so requires knowing what is the logic exactly you want to do with each row and whether it depends on other rows or not

CodePudding user response:

Thanks, @Bemwa Malak for opening my mind. Yeah, the file is quite a big, around 400MB with over 1 mill. rows. I'm using Python3, so I had to edit your idea a little.

Is this the right idea of limiting the number of threads?

# Threading
def runThreads():
    global data
    global loopParameter
    
    with open('csvfeed.csv', newline='', encoding='utf-8') as csvfile:
        reader = csv.reader(csvfile, delimiter=';', quotechar='|')
        threads = []
        for row in reader:
            t = threading.Thread(target=ParserFunction, args=(row,))
            t.start()
            threads.append(t)
            if len(threads) >= 500:
                for thread in threads:
                    thread.join()
                threads = []
        for thread in threads:
            thread.join()

The complete code of my logic is something like this, and from testing on smaller file I got execution time around 60s without threading and around 7s with threading, so It is much faster now.

And I tested it on the original file, and I got an execution time of 45 minutes instead of 4 hours, so somehow It's working.

from sqlalchemy import create_engine
import csv
import sys
import threading
import pandas as pd
import time

# Create a variable to hold the data
engine = create_engine('mssql pyodbc://SECRET')
data = []
counter = 0
loopParameter = 0

# Starter
def main():
    global data
    global totalRows
    runThreads()
    totalRows = len(data)

    # Save data variable to a new CSV file
    with open('output.csv', 'w', newline='', encoding='utf-8') as f2:
        writer = csv.writer(f2, delimiter=';')
        writer.writerows(data)
        
    # Log
    with open('log2', 'a') as f3:
        sys.stdout = f3 # Change the standard output to the file we created.
        print("Version: 1")
        print("Matched: ", len(data))

# My Parser
def ParserFunction(row):
    global data
    global counter
    
    query = ("SELECT (SELECT Count(*) FROM myTab WHERE myColumn='" row[5] "')   "
            "(SELECT Count(*) FROM myTab2 WHERE myColumn='" row[5] "')   "
            "(SELECT Count(*) FROM myTab3 WHERE myColumn='" row[5] "')   "
            "(SELECT COUNT(*) FROM myTab4 WHERE myColumn='" row[1] "')")
        
    with engine.connect() as con:
        rs = con.execute(query)
        # We have match
        if(rs.fetchone()[0] > 0):               
            # Add the row to the data variable
            data.append(row)
    pass

# Threading
def runThreads():
    global data
    global loopParameter
    
    with open('csvfeed.csv', newline='', encoding='utf-8') as csvfile:
        reader = csv.reader(csvfile, delimiter=';', quotechar='|')
        threads = []
        for row in reader:
            t = threading.Thread(target=ParserFunction, args=(row,))
            t.start()
            threads.append(t)
            if len(threads) >= 500:
                for thread in threads:
                    thread.join()
                threads = []
        for thread in threads:
            thread.join()

main()
  • Related