Need help with multiprocessing!
So, I've been building my code here that converts xlsx to csv and puts it into a temporary folder, then appends those files into a list which will be concatenated and (in the future) sent to a database. For now, this is the code I have working:
from pathlib import Path
from xlsx2csv import Xlsx2csv
import time
from pathlib import Path
import os
import glob
import pandas as pd
import shutil
start_time = round(time.time(), 2)
root_path = Path("C:/Teste")
files = Path(root_path).glob('*')
print("Creating a temporary folder...")
CSVFolder = "{}\\TempCSV".format(files)
CSVFolderLoc = glob.glob(CSVFolder)
directory = "TempCSV"
JoinTempPath = os.path.join(root_path, directory)
#Foldercreation gives an error if exists, so this code checks if it exists and deletes it
if os.path.isdir(JoinTempPath) == True:
print("--Removing existent Temporary CSV Directory--")
shutil.rmtree(JoinTempPath)
# Create the directory 'TempCSV' in 'parent_dir'
os.mkdir(JoinTempPath)
print("Directory '% s' created" % directory)
def func():
list = []
for file in files:
if file.suffix.lower() == '.xlsx':
print("Now converting '% s' file" % file)
#if it finds xlsx files, converts it to csv in the temporary folder
destFile = os.path.join(JoinTempPath, "{}.csv".format(file.stem))
print(destFile)
Xlsx2csv(file, outputencoding="utf-8").convert(destFile)
#appends those converted files into the empty list
for csvFile in destFile:
print(round(time.time() - start_time,2))
list.append(pd.read_csv(destFile, dtype='unicode'))
print(round(time.time() - start_time,2))
#concatenates (will be send to a database in the next version)
x = pd.concat(list)
print(round(time.time() - start_time,2))
#execute the function
func()
I need your help to improve the speed. I have been trying to implement Multiprocessing but I have failed. I've tried something like this but I saw nothing different happening:
import multiprocessing
(...)
if __name__ == '__main__':
p1 = multiprocessing.Process(target= func)
p2 = multiprocessing.Process(target= func)
p1.start()
p2.start()
p1.join()
p2.join()
Can you please help me implement multiprocessing or other solution for this loop to run in parallel?
I've also tried this:
if __name__ == '__main__':
processes=[]
num_processes= os.cpu_count() #nr of cpus to distribute workload
#create processes and assign a function for each process
for i in range(0, num_processes):
process = Process(target=func)
processes.append(Process)
#start all processes
for process in processes:
process.start()
for process in processes:
process.join()
But got this error:
Traceback (most recent call last):
File "c:\Users\CatarinaRibeiro\Desktop\DatabaseConfig\Main2.py", line 130, in <module>
process.start()
TypeError: BaseProcess.start() missing 1 required positional argument: 'self'
CodePudding user response:
It's because the way you do it, both of the processes convert all the files. To get faster, you will need to split the file list between the different thread and then run each thread with only a part of the files.
import os
import glob
import pandas as pd
import shutil
start_time = round(time.time(), 2)
root_path = Path("C:/Teste")
files = Path(root_path).glob('*')
print("Creating a temporary folder...")
CSVFolder = "{}\\TempCSV".format(files)
CSVFolderLoc = glob.glob(CSVFolder)
directory = "TempCSV"
JoinTempPath = os.path.join(root_path, directory)
#Foldercreation gives an error if exists, so this code checks if it exists and deletes it
if os.path.isdir(JoinTempPath) == True:
print("--Removing existent Temporary CSV Directory--")
shutil.rmtree(JoinTempPath)
# Create the directory 'TempCSV' in 'parent_dir'
os.mkdir(JoinTempPath)
print("Directory '% s' created" % directory)
def func(files_list):
list = []
for file in files_list:
if file.suffix.lower() == '.xlsx':
print("Now converting '% s' file" % file)
#if it finds xlsx files, converts it to csv in the temporary folder
destFile = os.path.join(JoinTempPath, "{}.csv".format(file.stem))
print(destFile)
Xlsx2csv(file, outputencoding="utf-8").convert(destFile)
#appends those converted files into the empty list
for csvFile in destFile:
print(round(time.time() - start_time,2))
list.append(pd.read_csv(destFile, dtype='unicode'))
print(round(time.time() - start_time,2))
#concatenates (will be send to a database in the next version)
x = pd.concat(list)
print(round(time.time() - start_time,2))
if __name__ == '__main__':
files = list(files)
p1 = multiprocessing.Process(target= func, args=(files[:len(files)//2],))
p2 = multiprocessing.Process(target= func, args=(files[len(files)//2:],))
p1.start()
p2.start()
p1.join()
p2.join()
CodePudding user response:
It looks like you're spawning two processes which both do the same thing, so it'll either be the same or slower (due to file contention). You need the processes to split the work. An obvious choice here would be to split the list of files N-ways and have each process do 1/Nth of the work. Finding the files and the final concatenation should happen in the main process for simplicity's sake.
I think a good way to use multiprocessing (or multithreading) in python is with the concurrent futures module, which simplifies things. Here's a really simple example with similar structure to your code:
from concurrent.futures import ProcessPoolExecutor as Executor
import glob
import time
def process_file(file):
# Some long-running processing...
time.sleep(0.5)
# Done!
with open(file, 'r') as f:
return f.read()
if __name__ == "__main__":
# Find some files
files = glob.glob('*.txt')
print(f'Running process_file on {files}')
# Run process_file in parallel for each file in files, using as many cores as possible.
results = Executor().map(process_file, files)
# Concatenate
final_result = '\n'.join(results)
print(final_result)
You'd basically move the body of your for file in files
loop to a function like process_file
above, and do a list = Executor().map(process_file, files)
call instead of explicitly looping.