sample csv file:
test.csv
process_cd
ramsize
dbsize
protocal
random
function will be called with below parameters
self.complete_stage_purge_process(self.targetCnxn, self.targetTable, self.processCode)
sample process codes:
test
protocal
forensic
each time function is called need to read the csv file for those process codes and if process code matches then bypass internal delete call
def complete_stage_purge_process(self, target_cnxn, stage_table, process_cd):
delete_sql = "delete from " schemaName.STAGE.value "." \
stage_table " where run_pk in (" run_pk_sql ")"
try:
trgt_cursor = target_cnxn.cursor()
trgt_cursor.execute(delete_sql)
target_cnxn.commit()
self.logger.debug("deletes processed successfully ")
target_cnxn.close()
except:
self.logger.exception('Error in processing deletes')
raise
else:
self.logger.debug('purge process is not required for this process')
how to achieve that csv read in loop
Tried with below piece of code but the code is still going to purge process and not running the process code search in loop
non_purge_process_file = open(self.file_path)
reader = csv.reader(non_purge_process_file)
for row in reader:
if process_cd in row:
self.logger.debug("Do not perform stage purge process.")
return
else:
delete_dt = datetime.today() - timedelta(days=30)
delete_dt = str(delete_dt)
CodePudding user response:
I achieved solution to above problem using below approach:
def check_process_cd(self, process_cd):
self.logger.debug(datetime.now())
self.logger.debug('check_process_cd')
purge_process_flag = 0
reader = csv.reader(purge_process_codes_file)
for row in reader:
if process_cd == row[0]:
self.logger.debug("Perform stage purge process.")
purge_process_flag = 1
return purge_process_flag
based on flag perform the function call
if self.purge_process_flag == 1:
self.complete_stage_purge_process(self.targetCnxn, self.targetTable, self.processCode)
else:
self.logger.debug('Do not perform purge process')
CodePudding user response:
import csv
def validate_and_purge(input_param, csv_file):
found = False # flag to indicate whether input_param is found in the CSV file
with open(csv_file, 'r') as f:
reader = csv.reader(f)
for row in reader:
if input_param in row:
found = True
break
if found:
return True
return False