Home > Net >  How to use multiprocessing in current Python application?
How to use multiprocessing in current Python application?

Time:09-17

I have an application that is reading thousand of files from different directories, it reads them, does some processing with them and then sends the data to a database. I have 1 problem, it takes approx. 1 hour to finish all the files from 1 directory, I have 19 directories (which could be more in the future). Right now its doing it directory after directory, I want to run everything parallel so I speed up things.

This is my code:

import mysql.connector
import csv
import os
import time
from datetime import datetime
import ntpath
import configparser

config = configparser.ConfigParser()                         
config.read('C:\Desktop\Energy\file_cfg.ini')

source = config['PATHS']['source']
archive = config['PATHS']['archive']

mydb = mysql.connector.connect(
        host= config['DB']['host'],
        user = config['DB']['user'],
        passwd = config['DB']['passwd'],
        database= config['DB']['database']
    )

cursor = mydb.cursor()
select_antenna = "SELECT * FROM `antenna`"
cursor.execute(select_antenna)

mp_mysql = [i[0] for i in cursor.fetchall()]
mp_server = os.listdir(source)

# microbeats clean.
cursor.execute("TRUNCATE TABLE microbeats")

for mp in mp_mysql:
    if mp in mp_server:
        subdir_paths = os.path.join(source, mp)

        for file in os.listdir(subdir_paths):
            file_paths = os.path.join(subdir_paths, file)
                        
            cr_time_s = os.path.getctime(file_paths)
            cr_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(cr_time_s))
                        
        all_file_paths = [os.path.join(subdir_paths, f) for f in os.listdir(subdir_paths)]
        full_file_paths = [p for p in all_file_paths if os.path.getsize(p) > 0] #<----- Control empty files.
                
        if full_file_paths != []:
            newest_file_paths = max(full_file_paths, key=os.path.getctime)
                        
            for file in all_file_paths:
                if file == newest_file_paths and os.path.getctime(newest_file_paths) < time.time() - 120: 
                    with open(file, 'rt') as f:
                        reader = csv.reader(f, delimiter ='\t')
                        line_data0 = list()
                        col = next(reader)
                        
                        for line in reader:
                            line.insert(0, mp)
                            line.insert(1, cr_time)
                            if line != []: #<----- Control empty directories.
                                line_data0.append(line)
                            
                                                                                                                                              
                        q1 = ("INSERT INTO microbeats"
                                     "(`antenna`,`datetime`,`system`,`item`,`event`, `status`, `accident`)"
                                     "VALUES (%s, %s, %s,%s, %s, %s, %s)")

                        for line in line_data0:
                             cursor.execute(q1, line)  

CodePudding user response:

I am using multiprocessing where each process has its own connection to the database. I have done the minimal changes to your code in an attempt to process directories in parallel. However, I am not sure if a variable such as subdir_paths is correctly named since the "s" at the end of its name implies that it contains multiple path names.

The reason why it has been suggested that this question is better suited for Code Review is because presumably you have an already-working program and you are only looking for a performance improvement (of course, that applies to a great percentage of the question posted on SO that are tagged with multiprocessing). That type of question is supposed to be posted on https://codereview.stackexchange.com/.

import mysql.connector
import csv
import os
import time
from datetime import datetime
import ntpath
import configparser
from multiprocessing import Pool, cpu_count

config = configparser.ConfigParser()
config.read('C:\Desktop\Energy\file_cfg.ini')

source = config['PATHS']['source']
archive = config['PATHS']['archive']

def get_connnection():
    mydb = mysql.connector.connect(
            host= config['DB']['host'],
            user = config['DB']['user'],
            passwd = config['DB']['passwd'],
            database= config['DB']['database']
        )
    return mydb

def get_mp_list():
    select_antenna = "SELECT * FROM `antenna`"
    mydb = get_connection()
    cursor = mydb.cursor()
    cursor.execute(select_antenna)

    mp_mysql = [i[0] for i in cursor.fetchall()]
    mp_server = os.listdir(source)

    # microbeats clean.
    cursor.execute("TRUNCATE TABLE microbeats")
    mydb.commit()
    mydb.close()

    mp_list = [mp for mp in mp_mysql if mp in mp_server]
    return mp_list

def process_mp(mp):
    subdir_paths = os.path.join(source, mp)
    for file in os.listdir(subdir_paths):
        file_paths = os.path.join(subdir_paths, file)

        cr_time_s = os.path.getctime(file_paths)
        cr_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(cr_time_s))

    all_file_paths = [os.path.join(subdir_paths, f) for f in os.listdir(subdir_paths)]
    full_file_paths = [p for p in all_file_paths if os.path.getsize(p) > 0] #<----- Control empty files.

    if full_file_paths != []:
        newest_file_paths = max(full_file_paths, key=os.path.getctime)

        mydb = get_connection()
        cursor = mydb.cursor()
        did_insert = False
        q1 = ("INSERT INTO microbeats"
                     "(`antenna`,`datetime`,`system`,`item`,`event`, `status`, `accident`)"
                     "VALUES (%s, %s, %s,%s, %s, %s, %s)")
        for file in all_file_paths:
            if file == newest_file_paths and os.path.getctime(newest_file_paths) < time.time() - 120:
                with open(file, 'rt') as f:
                    reader = csv.reader(f, delimiter ='\t')
                    line_data0 = list()
                    col = next(reader)

                    for line in reader:
                        line.insert(0, mp)
                        line.insert(1, cr_time)
                        if line != []: #<----- Control empty directories.
                            line_data0.append(line)

                    if line_data0:
                        cursor.executemany(q1, line_data0)
                        did_insert = True
        if did_insert:
            mydb.commit()
        mydb.close()

def main():
    mp_list = get_mp_list()
    pool = Pool(min(cpu_count(), len(mp_list)))
    results = pool.imap_unordered(process_mp, mp_list)
    while True:
        try:
            result = next(results)
        except StopIteration:
            break
        except BaseException as e:
            print(e)

if __name__ == '__main__':
    main()
  • Related