Home > Net >  How to solve the WinError when Multithreading
How to solve the WinError when Multithreading

Time:09-17

I have added the multiprocessing part to my code but I receive below error. I dont know what is wrong, I have placed everything inside the __main__ block as I though that would solve the issue, however that did not work.

[WinError None] The process cannot access the file because it is being used by another process: 'c:\\data\\Energy\\Desktop\\Source\\d\\New Text Document.txt' -> 'c:\\data\\Energy\\Desktop\\Source\\d\\New Text Document.txt'

This is my code:

import mysql.connector
import csv
import os
import time
import re
from datetime import datetime
import ntpath
import configparser
from multiprocessing import Pool, cpu_count

config = configparser.ConfigParser()
config.read(r'Energy_cfg.ini')

source = config['PATHS']['source']
archive = config['PATHS']['archive']

def get_connection():
    mydb = mysql.connector.connect(
            host= config['DB']['host'],
            user = config['DB']['user'],
            passwd = config['DB']['passwd'],
            database= config['DB']['database']
        )
    return mydb

def get_mp_list():
    select_antennas = "SELECT * FROM `antennas`"
    mydb = get_connection()
    cursor = mydb.cursor()
    cursor.execute(select_antennas)

    mp_mysql = [i[0] for i in cursor.fetchall()]
    mp_server = os.listdir(source)

    # microbeats clean.
    cursor.execute("TRUNCATE TABLE microbeats")
    mydb.commit()
    mydb.close()

    mp_list = [mp for mp in mp_mysql if mp in mp_server]
    return mp_list

def process_mp(mp):
    subdir_paths = os.path.join(source, mp)
    for file in os.listdir(subdir_paths):
        file_paths = os.path.join(subdir_paths, file)

        cr_time_s = os.path.getctime(file_paths)
        cr_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(cr_time_s))

    all_file_paths = [os.path.join(subdir_paths, f) for f in os.listdir(subdir_paths)]
    full_file_paths = [p for p in all_file_paths if os.path.getsize(p) > 0] #<----- Control empty files.

    if full_file_paths != []:
        newest_file_paths = max(full_file_paths, key=os.path.getctime)

        mydb = get_connection()
        cursor = mydb.cursor()
        did_insert = False
        for file in all_file_paths:
            if file == newest_file_paths and os.path.getctime(newest_file_paths) < time.time() - 120:
                with open(file, 'rt') as f:
                    reader = csv.reader(f, delimiter ='\t')
                    line_data0 = list()
                    col = next(reader)

                    for line in reader:
                        line.insert(0, mp)
                        line.insert(1, cr_time)
                        if line != []: #<----- Control empty directories.
                            line_data0.append(line)

                        q1 = ("INSERT INTO microbeats"
                                     "(`antena`,`datetime`,`system`,`cave`,`Color`, `nature`, `options`)"
                                     "VALUES (%s, %s, %s,%s, %s, %s, %s)")

                    for line in line_data0:
                         cursor.execute(q1, line)
                         did_insert = True

                    for i in line_data0:
                            if 'AI stats' in i:
                                list_z = [float(x.split(":")[1]) for x in i[6].split(",")] 
                                for variable in list_z:
                                    dc1 = list_z[0]
                                    sd1 = list_z[1]
                                    dc1max = list_z[2]
                                    dc1min = list_z[3]
                                    dc2 = list_z[4]
                                    sd2 = list_z[5]
                                    dc2max = list_z[6]
                                    dc2min = list_z[7]
                                    ac = list_z[8]
                                    sd3 = list_z[9]
                                    acmax= list_z[10]
                                    acmin = list_z[11] 
                                                                                               
                                red_zone = 1
                                
                                if dc1min <50 or dc2min <50:
                                    red_zone == 1

                                if dc1max >3000 or dc2max >3000:
                                    red_zone ==1
                                    
                                if dc1 <1 or dc2 <1:
                                    red_zone == 1

                                if ac >= -1 or ac <= 1:
                                    red_zone ==1

                                if acmin < -700:
                                    red_zone ==1

                                if acmax > 700: 
                                    red_zone ==1
                                    
                                if red_zone  ==1:
                                    q2 = ("Update microbeats SET `nature` = 'SIZED' WHERE cave = 'AI stats' AND antena = '{}'".format(mp)) 
                                    cursor.execute(q2) #<-----updaten microbeats
    
                    hb_name = ntpath.basename(newest_file_paths) 
                    cur_time = datetime.now()
                    cur_time_m = cur_time.strftime("%M")
                    
                    md_time = os.path.getctime(newest_file_paths)
                    md_time_m = time.strftime("%M", time.localtime(md_time))
                    l_heared = int(cur_time_m) - int(md_time_m) 
                    
                    line_data1 = list()
                    line_data1.insert(0, hb_name)
                    line_data1.insert(1, l_heared)
                                            
                    statinfo = os.stat(newest_file_paths)
                    
                    if statinfo.st_size < 400:
                        line_data1.insert(2, 'Corrupt')
                    else:
                        line_data1.insert(2, '-_-')
                        
                    line_data10 = list()
                    line_data10.append(line_data1)
                                            
                    q3 = "UPDATE antennas SET lsbeat = %s, lastheared = %s, alarm =%s WHERE `antennas`.`antena` = '{}'".format(mp)
                    
                    for line in line_data10:
                        cursor.execute(q3, line) #<-----aanvullen antennas
                        
                    cur_time = datetime.now()
                    cur_time_f = cur_time.strftime("%Y-%m-%d %H:%M:%S")  
                    
                    line_data2 = list()
                    line_data2.append(cur_time_f)  

                    
                    q4 = ("UPDATE leaving_time SET Datetime = %s") 
                    
                    cursor.execute(q4, line_data2)  #<-----vullen leaving_time
                                                                     
                    q5 = ("INSERT INTO microbeats (antena, datetime, `system`, Color, cave, `nature`, options) (SELECT antena, datetime, `system`, 's', 'TDiff', `nature`, options FROM microbeats WHERE Color = 'M' AND cave = 'tijd' AND antena = '{}')".format(mp))                
                    cursor.execute(q5) #<-----updaten microbeats

                    sys_t_l = []       
                    for i in line_data0:
                        if 'Tijd' in i:
                            
                            sys_t = re.search('Organic Sys:(. ?),', i[6]).group(1)
                            sys_t = sys_t.replace('-', '')
                            sys_t_l.append(sys_t)

                            for var in sys_t_l:
                                dt_sys_t = datetime.strptime(var, " %Y%m%d%H%M%S") 
                                
                                cur_time = datetime.now()
                                                                    
                                time_delta = (dt_sys_t - cur_time)
                                total_seconds = time_delta.total_seconds()
                                minutes = total_seconds/60

                                if minutes > 90:
                                    q6 = ("Update microbeats SET `nature` = 'SIZED' WHERE cave = 'TDiff' AND antena = '{}'".format(mp)) 
                                    cursor.execute(q6)
                                 
                    
                    for file in all_file_paths:
                        if os.path.getctime(file) > time.time() - 120:
                            continue 
                        if os.path.getsize(file) == 0:
                            dst_root = source
                            dst_path_2 = os.path.join(dst_root, mp, os.path.basename(file))
                            os.rename(file, dst_path_2)
                            
                    for file in full_file_paths:
                        if os.path.getctime(file) > time.time() - 120:
                            continue 
                                                   
                        if file == newest_file_paths :
                            dst_root = source
                            dst_path_3 = os.path.join(dst_root, mp, os.path.basename(file))
                            os.rename(file, dst_path_3)
                        else:
                            date_archive = os.path.join(archive, datetime.now().strftime('%Y%m'), mp)
                            dst_root = date_archive
                            if not os.path.exists(dst_root):
                                os.makedirs(dst_root)
                            dst_path_4 = os.path.join(dst_root, os.path.basename(file))
                            os.replace(file, dst_path_4)   
                    
        if did_insert:
            mydb.commit()
        mydb.close()


def main():
    mp_list = get_mp_list()
    pool = Pool(min(cpu_count(), len(mp_list)))
    results = pool.imap_unordered(process_mp, mp_list)
    while True:
        try:
            result = next(results)
        except StopIteration:
            break
        except BaseException as e:
            print(e)

if __name__ == '__main__':
    main()

CodePudding user response:

See my comments to your question above. It's still not clear where the error is occurring so I have added a stack trace and for my own edification some additional statements with #Booboo added to the end, which you should eventually remove. I have also modified the commit processing necessitated by your additional SQL statements.

import mysql.connector
import csv
import os
import time
import re
from datetime import datetime
import ntpath
import configparser
from multiprocessing import Pool, cpu_count
import traceback

config = configparser.ConfigParser()
config.read(r'Energy_cfg.ini')

source = config['PATHS']['source']
archive = config['PATHS']['archive']

def get_connection():
    mydb = mysql.connector.connect(
            host= config['DB']['host'],
            user = config['DB']['user'],
            passwd = config['DB']['passwd'],
            database= config['DB']['database']
        )
    return mydb

def get_mp_list():
    select_antennas = "SELECT * FROM `antennas`"
    mydb = get_connection()
    cursor = mydb.cursor()
    cursor.execute(select_antennas)

    mp_mysql = [i[0] for i in cursor.fetchall()]
    mp_server = os.listdir(source)

    # microbeats clean.
    cursor.execute("TRUNCATE TABLE microbeats")
    mydb.commit()
    mydb.close()

    mp_list = [mp for mp in mp_mysql if mp in mp_server]
    return mp_list

def process_mp(mp):
    subdir_paths = os.path.join(source, mp)
    for file in os.listdir(subdir_paths):
        file_paths = os.path.join(subdir_paths, file)

        cr_time_s = os.path.getctime(file_paths)
        cr_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(cr_time_s))

    all_file_paths = [os.path.join(subdir_paths, f) for f in os.listdir(subdir_paths)]
    full_file_paths = [p for p in all_file_paths if os.path.getsize(p) > 0] #<----- Control empty files.

    if full_file_paths != []:
        newest_file_paths = max(full_file_paths, key=os.path.getctime)

        mydb = get_connection()
        cursor = mydb.cursor()
        for file in all_file_paths:
            if file == newest_file_paths and os.path.getctime(newest_file_paths) < time.time() - 120:
                with open(file, 'rt') as f:
                    reader = csv.reader(f, delimiter ='\t')
                    line_data0 = list()
                    col = next(reader)

                    for line in reader:
                        line.insert(0, mp)
                        line.insert(1, cr_time)
                        if line != []: #<----- Control empty directories.
                            line_data0.append(line)

                        q1 = ("INSERT INTO microbeats"
                                     "(`antena`,`datetime`,`system`,`cave`,`Color`, `nature`, `options`)"
                                     "VALUES (%s, %s, %s,%s, %s, %s, %s)")

                    for line in line_data0:
                         cursor.execute(q1, line)

                    for i in line_data0:
                            if 'AI stats' in i:
                                list_z = [float(x.split(":")[1]) for x in i[6].split(",")]
                                for variable in list_z:
                                    dc1 = list_z[0]
                                    sd1 = list_z[1]
                                    dc1max = list_z[2]
                                    dc1min = list_z[3]
                                    dc2 = list_z[4]
                                    sd2 = list_z[5]
                                    dc2max = list_z[6]
                                    dc2min = list_z[7]
                                    ac = list_z[8]
                                    sd3 = list_z[9]
                                    acmax= list_z[10]
                                    acmin = list_z[11]

                                red_zone = 1

                                if dc1min <50 or dc2min <50:
                                    red_zone == 1

                                if dc1max >3000 or dc2max >3000:
                                    red_zone ==1

                                if dc1 <1 or dc2 <1:
                                    red_zone == 1

                                if ac >= -1 or ac <= 1:
                                    red_zone ==1

                                if acmin < -700:
                                    red_zone ==1

                                if acmax > 700:
                                    red_zone ==1

                                if red_zone  ==1:
                                    q2 = ("Update microbeats SET `nature` = 'SIZED' WHERE cave = 'AI stats' AND antena = '{}'".format(mp))
                                    cursor.execute(q2) #<-----updaten microbeats

                    hb_name = ntpath.basename(newest_file_paths)
                    cur_time = datetime.now()
                    cur_time_m = cur_time.strftime("%M")

                    md_time = os.path.getctime(newest_file_paths)
                    md_time_m = time.strftime("%M", time.localtime(md_time))
                    l_heared = int(cur_time_m) - int(md_time_m)

                    line_data1 = list()
                    line_data1.insert(0, hb_name)
                    line_data1.insert(1, l_heared)

                    statinfo = os.stat(newest_file_paths)

                    if statinfo.st_size < 400:
                        line_data1.insert(2, 'Corrupt')
                    else:
                        line_data1.insert(2, '-_-')

                    line_data10 = list()
                    line_data10.append(line_data1)

                    q3 = "UPDATE antennas SET lsbeat = %s, lastheared = %s, alarm =%s WHERE `antennas`.`antena` = '{}'".format(mp)

                    if line_data10:
                        cursor.executemany(q3, line_data10)

                    cur_time = datetime.now()
                    cur_time_f = cur_time.strftime("%Y-%m-%d %H:%M:%S")

                    line_data2 = list()
                    line_data2.append(cur_time_f)


                    q4 = ("UPDATE leaving_time SET Datetime = %s")

                    cursor.execute(q4, line_data2)  #<-----vullen leaving_time

                    q5 = ("INSERT INTO microbeats (antena, datetime, `system`, Color, cave, `nature`, options) (SELECT antena, datetime, `system`, 's', 'TDiff', `nature`, options FROM microbeats WHERE Color = 'M' AND cave = 'tijd' AND antena = '{}')".format(mp))
                    cursor.execute(q5) #<-----updaten microbeats

                    sys_t_l = []
                    for i in line_data0:
                        if 'Tijd' in i:

                            sys_t = re.search('Organic Sys:(. ?),', i[6]).group(1)
                            sys_t = sys_t.replace('-', '')
                            sys_t_l.append(sys_t)

                            for var in sys_t_l:
                                dt_sys_t = datetime.strptime(var, " %Y%m%d%H%M%S")

                                cur_time = datetime.now()

                                time_delta = (dt_sys_t - cur_time)
                                total_seconds = time_delta.total_seconds()
                                minutes = total_seconds/60

                                if minutes > 90:
                                    q6 = ("Update microbeats SET `nature` = 'SIZED' WHERE cave = 'TDiff' AND antena = '{}'".format(mp))
                                    cursor.execute(q6)

                    for file in full_file_paths:
                        if os.path.getctime(file) <= time.time() - 120 and file != newest_file_paths:
                            date_archive = os.path.join(archive, datetime.now().strftime('%Y%m'), mp)
                            dst_root = date_archive
                            if not os.path.exists(dst_root):
                                os.makedirs(dst_root, exist_ok=True)
                            dst_path_4 = os.path.join(dst_root, os.path.basename(file))
                            os.replace(file, dst_path_4)

        mydb.commit()
        mydb.close()


def main():
    mp_list = get_mp_list()
    pool = Pool(min(cpu_count(), len(mp_list)))
    results = pool.imap_unordered(process_mp, mp_list)
    while True:
        try:
            result = next(results)
        except StopIteration:
            break
        except BaseException as e:
            traceback.print_exc()

if __name__ == '__main__':
    main()
  • Related