I have added the multiprocessing part to my code but I receive below error. I dont know what is wrong, I have placed everything inside the __main__
block as I though that would solve the issue, however that did not work.
[WinError None] The process cannot access the file because it is being used by another process: 'c:\\data\\Energy\\Desktop\\Source\\d\\New Text Document.txt' -> 'c:\\data\\Energy\\Desktop\\Source\\d\\New Text Document.txt'
This is my code:
import mysql.connector
import csv
import os
import time
import re
from datetime import datetime
import ntpath
import configparser
from multiprocessing import Pool, cpu_count
config = configparser.ConfigParser()
config.read(r'Energy_cfg.ini')
source = config['PATHS']['source']
archive = config['PATHS']['archive']
def get_connection():
mydb = mysql.connector.connect(
host= config['DB']['host'],
user = config['DB']['user'],
passwd = config['DB']['passwd'],
database= config['DB']['database']
)
return mydb
def get_mp_list():
select_antennas = "SELECT * FROM `antennas`"
mydb = get_connection()
cursor = mydb.cursor()
cursor.execute(select_antennas)
mp_mysql = [i[0] for i in cursor.fetchall()]
mp_server = os.listdir(source)
# microbeats clean.
cursor.execute("TRUNCATE TABLE microbeats")
mydb.commit()
mydb.close()
mp_list = [mp for mp in mp_mysql if mp in mp_server]
return mp_list
def process_mp(mp):
subdir_paths = os.path.join(source, mp)
for file in os.listdir(subdir_paths):
file_paths = os.path.join(subdir_paths, file)
cr_time_s = os.path.getctime(file_paths)
cr_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(cr_time_s))
all_file_paths = [os.path.join(subdir_paths, f) for f in os.listdir(subdir_paths)]
full_file_paths = [p for p in all_file_paths if os.path.getsize(p) > 0] #<----- Control empty files.
if full_file_paths != []:
newest_file_paths = max(full_file_paths, key=os.path.getctime)
mydb = get_connection()
cursor = mydb.cursor()
did_insert = False
for file in all_file_paths:
if file == newest_file_paths and os.path.getctime(newest_file_paths) < time.time() - 120:
with open(file, 'rt') as f:
reader = csv.reader(f, delimiter ='\t')
line_data0 = list()
col = next(reader)
for line in reader:
line.insert(0, mp)
line.insert(1, cr_time)
if line != []: #<----- Control empty directories.
line_data0.append(line)
q1 = ("INSERT INTO microbeats"
"(`antena`,`datetime`,`system`,`cave`,`Color`, `nature`, `options`)"
"VALUES (%s, %s, %s,%s, %s, %s, %s)")
for line in line_data0:
cursor.execute(q1, line)
did_insert = True
for i in line_data0:
if 'AI stats' in i:
list_z = [float(x.split(":")[1]) for x in i[6].split(",")]
for variable in list_z:
dc1 = list_z[0]
sd1 = list_z[1]
dc1max = list_z[2]
dc1min = list_z[3]
dc2 = list_z[4]
sd2 = list_z[5]
dc2max = list_z[6]
dc2min = list_z[7]
ac = list_z[8]
sd3 = list_z[9]
acmax= list_z[10]
acmin = list_z[11]
red_zone = 1
if dc1min <50 or dc2min <50:
red_zone == 1
if dc1max >3000 or dc2max >3000:
red_zone ==1
if dc1 <1 or dc2 <1:
red_zone == 1
if ac >= -1 or ac <= 1:
red_zone ==1
if acmin < -700:
red_zone ==1
if acmax > 700:
red_zone ==1
if red_zone ==1:
q2 = ("Update microbeats SET `nature` = 'SIZED' WHERE cave = 'AI stats' AND antena = '{}'".format(mp))
cursor.execute(q2) #<-----updaten microbeats
hb_name = ntpath.basename(newest_file_paths)
cur_time = datetime.now()
cur_time_m = cur_time.strftime("%M")
md_time = os.path.getctime(newest_file_paths)
md_time_m = time.strftime("%M", time.localtime(md_time))
l_heared = int(cur_time_m) - int(md_time_m)
line_data1 = list()
line_data1.insert(0, hb_name)
line_data1.insert(1, l_heared)
statinfo = os.stat(newest_file_paths)
if statinfo.st_size < 400:
line_data1.insert(2, 'Corrupt')
else:
line_data1.insert(2, '-_-')
line_data10 = list()
line_data10.append(line_data1)
q3 = "UPDATE antennas SET lsbeat = %s, lastheared = %s, alarm =%s WHERE `antennas`.`antena` = '{}'".format(mp)
for line in line_data10:
cursor.execute(q3, line) #<-----aanvullen antennas
cur_time = datetime.now()
cur_time_f = cur_time.strftime("%Y-%m-%d %H:%M:%S")
line_data2 = list()
line_data2.append(cur_time_f)
q4 = ("UPDATE leaving_time SET Datetime = %s")
cursor.execute(q4, line_data2) #<-----vullen leaving_time
q5 = ("INSERT INTO microbeats (antena, datetime, `system`, Color, cave, `nature`, options) (SELECT antena, datetime, `system`, 's', 'TDiff', `nature`, options FROM microbeats WHERE Color = 'M' AND cave = 'tijd' AND antena = '{}')".format(mp))
cursor.execute(q5) #<-----updaten microbeats
sys_t_l = []
for i in line_data0:
if 'Tijd' in i:
sys_t = re.search('Organic Sys:(. ?),', i[6]).group(1)
sys_t = sys_t.replace('-', '')
sys_t_l.append(sys_t)
for var in sys_t_l:
dt_sys_t = datetime.strptime(var, " %Y%m%d%H%M%S")
cur_time = datetime.now()
time_delta = (dt_sys_t - cur_time)
total_seconds = time_delta.total_seconds()
minutes = total_seconds/60
if minutes > 90:
q6 = ("Update microbeats SET `nature` = 'SIZED' WHERE cave = 'TDiff' AND antena = '{}'".format(mp))
cursor.execute(q6)
for file in all_file_paths:
if os.path.getctime(file) > time.time() - 120:
continue
if os.path.getsize(file) == 0:
dst_root = source
dst_path_2 = os.path.join(dst_root, mp, os.path.basename(file))
os.rename(file, dst_path_2)
for file in full_file_paths:
if os.path.getctime(file) > time.time() - 120:
continue
if file == newest_file_paths :
dst_root = source
dst_path_3 = os.path.join(dst_root, mp, os.path.basename(file))
os.rename(file, dst_path_3)
else:
date_archive = os.path.join(archive, datetime.now().strftime('%Y%m'), mp)
dst_root = date_archive
if not os.path.exists(dst_root):
os.makedirs(dst_root)
dst_path_4 = os.path.join(dst_root, os.path.basename(file))
os.replace(file, dst_path_4)
if did_insert:
mydb.commit()
mydb.close()
def main():
mp_list = get_mp_list()
pool = Pool(min(cpu_count(), len(mp_list)))
results = pool.imap_unordered(process_mp, mp_list)
while True:
try:
result = next(results)
except StopIteration:
break
except BaseException as e:
print(e)
if __name__ == '__main__':
main()
CodePudding user response:
See my comments to your question above. It's still not clear where the error is occurring so I have added a stack trace and for my own edification some additional statements with #Booboo
added to the end, which you should eventually remove. I have also modified the commit processing necessitated by your additional SQL statements.
import mysql.connector
import csv
import os
import time
import re
from datetime import datetime
import ntpath
import configparser
from multiprocessing import Pool, cpu_count
import traceback
config = configparser.ConfigParser()
config.read(r'Energy_cfg.ini')
source = config['PATHS']['source']
archive = config['PATHS']['archive']
def get_connection():
mydb = mysql.connector.connect(
host= config['DB']['host'],
user = config['DB']['user'],
passwd = config['DB']['passwd'],
database= config['DB']['database']
)
return mydb
def get_mp_list():
select_antennas = "SELECT * FROM `antennas`"
mydb = get_connection()
cursor = mydb.cursor()
cursor.execute(select_antennas)
mp_mysql = [i[0] for i in cursor.fetchall()]
mp_server = os.listdir(source)
# microbeats clean.
cursor.execute("TRUNCATE TABLE microbeats")
mydb.commit()
mydb.close()
mp_list = [mp for mp in mp_mysql if mp in mp_server]
return mp_list
def process_mp(mp):
subdir_paths = os.path.join(source, mp)
for file in os.listdir(subdir_paths):
file_paths = os.path.join(subdir_paths, file)
cr_time_s = os.path.getctime(file_paths)
cr_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(cr_time_s))
all_file_paths = [os.path.join(subdir_paths, f) for f in os.listdir(subdir_paths)]
full_file_paths = [p for p in all_file_paths if os.path.getsize(p) > 0] #<----- Control empty files.
if full_file_paths != []:
newest_file_paths = max(full_file_paths, key=os.path.getctime)
mydb = get_connection()
cursor = mydb.cursor()
for file in all_file_paths:
if file == newest_file_paths and os.path.getctime(newest_file_paths) < time.time() - 120:
with open(file, 'rt') as f:
reader = csv.reader(f, delimiter ='\t')
line_data0 = list()
col = next(reader)
for line in reader:
line.insert(0, mp)
line.insert(1, cr_time)
if line != []: #<----- Control empty directories.
line_data0.append(line)
q1 = ("INSERT INTO microbeats"
"(`antena`,`datetime`,`system`,`cave`,`Color`, `nature`, `options`)"
"VALUES (%s, %s, %s,%s, %s, %s, %s)")
for line in line_data0:
cursor.execute(q1, line)
for i in line_data0:
if 'AI stats' in i:
list_z = [float(x.split(":")[1]) for x in i[6].split(",")]
for variable in list_z:
dc1 = list_z[0]
sd1 = list_z[1]
dc1max = list_z[2]
dc1min = list_z[3]
dc2 = list_z[4]
sd2 = list_z[5]
dc2max = list_z[6]
dc2min = list_z[7]
ac = list_z[8]
sd3 = list_z[9]
acmax= list_z[10]
acmin = list_z[11]
red_zone = 1
if dc1min <50 or dc2min <50:
red_zone == 1
if dc1max >3000 or dc2max >3000:
red_zone ==1
if dc1 <1 or dc2 <1:
red_zone == 1
if ac >= -1 or ac <= 1:
red_zone ==1
if acmin < -700:
red_zone ==1
if acmax > 700:
red_zone ==1
if red_zone ==1:
q2 = ("Update microbeats SET `nature` = 'SIZED' WHERE cave = 'AI stats' AND antena = '{}'".format(mp))
cursor.execute(q2) #<-----updaten microbeats
hb_name = ntpath.basename(newest_file_paths)
cur_time = datetime.now()
cur_time_m = cur_time.strftime("%M")
md_time = os.path.getctime(newest_file_paths)
md_time_m = time.strftime("%M", time.localtime(md_time))
l_heared = int(cur_time_m) - int(md_time_m)
line_data1 = list()
line_data1.insert(0, hb_name)
line_data1.insert(1, l_heared)
statinfo = os.stat(newest_file_paths)
if statinfo.st_size < 400:
line_data1.insert(2, 'Corrupt')
else:
line_data1.insert(2, '-_-')
line_data10 = list()
line_data10.append(line_data1)
q3 = "UPDATE antennas SET lsbeat = %s, lastheared = %s, alarm =%s WHERE `antennas`.`antena` = '{}'".format(mp)
if line_data10:
cursor.executemany(q3, line_data10)
cur_time = datetime.now()
cur_time_f = cur_time.strftime("%Y-%m-%d %H:%M:%S")
line_data2 = list()
line_data2.append(cur_time_f)
q4 = ("UPDATE leaving_time SET Datetime = %s")
cursor.execute(q4, line_data2) #<-----vullen leaving_time
q5 = ("INSERT INTO microbeats (antena, datetime, `system`, Color, cave, `nature`, options) (SELECT antena, datetime, `system`, 's', 'TDiff', `nature`, options FROM microbeats WHERE Color = 'M' AND cave = 'tijd' AND antena = '{}')".format(mp))
cursor.execute(q5) #<-----updaten microbeats
sys_t_l = []
for i in line_data0:
if 'Tijd' in i:
sys_t = re.search('Organic Sys:(. ?),', i[6]).group(1)
sys_t = sys_t.replace('-', '')
sys_t_l.append(sys_t)
for var in sys_t_l:
dt_sys_t = datetime.strptime(var, " %Y%m%d%H%M%S")
cur_time = datetime.now()
time_delta = (dt_sys_t - cur_time)
total_seconds = time_delta.total_seconds()
minutes = total_seconds/60
if minutes > 90:
q6 = ("Update microbeats SET `nature` = 'SIZED' WHERE cave = 'TDiff' AND antena = '{}'".format(mp))
cursor.execute(q6)
for file in full_file_paths:
if os.path.getctime(file) <= time.time() - 120 and file != newest_file_paths:
date_archive = os.path.join(archive, datetime.now().strftime('%Y%m'), mp)
dst_root = date_archive
if not os.path.exists(dst_root):
os.makedirs(dst_root, exist_ok=True)
dst_path_4 = os.path.join(dst_root, os.path.basename(file))
os.replace(file, dst_path_4)
mydb.commit()
mydb.close()
def main():
mp_list = get_mp_list()
pool = Pool(min(cpu_count(), len(mp_list)))
results = pool.imap_unordered(process_mp, mp_list)
while True:
try:
result = next(results)
except StopIteration:
break
except BaseException as e:
traceback.print_exc()
if __name__ == '__main__':
main()