I have a CSV file parser script in Python to do some stuff with a big CSV file. There is around 1 mil. rows, so the process takes some time.
import csv
import sys
with open('csvfeed.csv', newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile, delimiter=';', quotechar='|')
for row in reader:
ParserFunction(row)
def ParserFunction(row):
#Some logic with row
Is there a way to multi-thread this loop function, to lower the execution time?
Thanks
CodePudding user response:
You can divide each row to be processed with a single thread instead of the main thread waiting for the previous row to finish processing to proceed with the next row:
import csv
import sys
import threading
def ParserFunction(row):
#Some logic with row
pass
with open('csvfeed.csv', newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile, delimiter=';', quotechar='|')
for row in reader:
threading.start_new_thread(ParserFunction, row)
But the exact way of doing so requires knowing what is the logic exactly you want to do with each row and whether it depends on other rows or not
CodePudding user response:
Thanks, @Bemwa Malak for opening my mind. Yeah, the file is quite a big, around 400MB with over 1 mill. rows. I'm using Python3, so I had to edit your idea a little.
Is this the right idea of limiting the number of threads?
# Threading
def runThreads():
global data
global loopParameter
with open('csvfeed.csv', newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile, delimiter=';', quotechar='|')
threads = []
for row in reader:
t = threading.Thread(target=ParserFunction, args=(row,))
t.start()
threads.append(t)
if len(threads) >= 500:
for thread in threads:
thread.join()
threads = []
for thread in threads:
thread.join()
The complete code of my logic is something like this, and from testing on smaller file I got execution time around 60s without threading and around 7s with threading, so It is much faster now.
And I tested it on the original file, and I got an execution time of 45 minutes instead of 4 hours, so somehow It's working.
from sqlalchemy import create_engine
import csv
import sys
import threading
import pandas as pd
import time
# Create a variable to hold the data
engine = create_engine('mssql pyodbc://SECRET')
data = []
counter = 0
loopParameter = 0
# Starter
def main():
global data
global totalRows
runThreads()
totalRows = len(data)
# Save data variable to a new CSV file
with open('output.csv', 'w', newline='', encoding='utf-8') as f2:
writer = csv.writer(f2, delimiter=';')
writer.writerows(data)
# Log
with open('log2', 'a') as f3:
sys.stdout = f3 # Change the standard output to the file we created.
print("Version: 1")
print("Matched: ", len(data))
# My Parser
def ParserFunction(row):
global data
global counter
query = ("SELECT (SELECT Count(*) FROM myTab WHERE myColumn='" row[5] "') "
"(SELECT Count(*) FROM myTab2 WHERE myColumn='" row[5] "') "
"(SELECT Count(*) FROM myTab3 WHERE myColumn='" row[5] "') "
"(SELECT COUNT(*) FROM myTab4 WHERE myColumn='" row[1] "')")
with engine.connect() as con:
rs = con.execute(query)
# We have match
if(rs.fetchone()[0] > 0):
# Add the row to the data variable
data.append(row)
pass
# Threading
def runThreads():
global data
global loopParameter
with open('csvfeed.csv', newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile, delimiter=';', quotechar='|')
threads = []
for row in reader:
t = threading.Thread(target=ParserFunction, args=(row,))
t.start()
threads.append(t)
if len(threads) >= 500:
for thread in threads:
thread.join()
threads = []
for thread in threads:
thread.join()
main()