At my Python application, I do a lot of data processing which in the end generates a lot of small files, sometimes more than 20.000 per job. Later in my processing-flow, I upload all these files to an S3 storage. The problem is that sometimes for some reason not all files reach the S3 storage, which I don't understand as I explicitly check if the file is there:
count_lock = threading.Lock()
obj_count = 0
def __upload(object_path_pair):
global obj_count
sleep_time = 5
num_retries = 10
for x in range(0, num_retries):
try:
libera_resource.upload_file(*object_path_pair)
sleep(random.uniform(1, 5))
with count_lock:
libera_resource_status = libera_resource.Object(object_path_pair[1]).get()['ResponseMetadata'].get('HTTPStatusCode')
if libera_resource_status == 200 and obj_count > 0:
print(f'Item: {file_name} - HLS segment {obj_count} / {len(segment_upload_list)} uploaded successfully.')
elif libera_resource_status != 200:
print(f'Item: {file_name} - HLS segment {obj_count} / {len(segment_upload_list)} uploaded failed, will be tried again.')
obj_count = 1
upload_error = None
except Exception as upload_error:
pass
if upload_error or libera_resource_status != 200:
sleep(sleep_time) # wait before trying to fetch the data again
sleep_time *= 2
else:
break
def upload_segments(segment_upload_list):
global obj_count
obj_count = 0
with ThreadPoolExecutor(max_workers=100) as executor:
executor.map(__upload, segment_upload_list)
upload_segments(segment_upload_list)
Here, libera_ressource basically is boto3.resource. Can somebody tell where and why I might sometimes miss a file?
Thanks in advance
CodePudding user response:
This code probably isn't doing what you expect when an exception is encountered:
try:
# (stuff)
upload_error = None
except Exception as upload_error:
pass
if upload_error or libera_resource_status != 200:
# more stuff
If an exception is encountered, it's assigned into upload_error
for the except
clause, but upload_error
is then deleted on exit from the except
clause. See PEP 3110 and this Reddit discussion.
So if you get an exception, the subsequent if
statement throws (because uploadError
is now unassigned) and you've crashed out of your __upload
function without retrying.
This won't cause the other threads in your pool to fail, so it's easy to miss if you're not checking for it.