I am trying to convert this converter (XML files from S3, to JSON) into a multithreaded application so I can speed up execution of multiple files (have 985). Whenever I run this I get: RuntimeWarning: coroutine 'process_object' was never awaited
Here is the code at a high level:
async def process_object(filename, pid=None):
start = time.time()
s3 = S3Client(...)
opensearch_client = OpenSearchClient(...)
Parser.DEBUG = True
parser = Parser(s3, opensearch_client)
save_file = ...
s3.download_from_s3(filename, save_file)
parser.current_prefix = filename
await parser.parse(save_file)
return f"Processed {filename} in {time.time() - start} seconds"
if "__main__" == __name__:
objects = get_objects(top_n=3) # list of prefixes for S3
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [
asyncio.wrap_future(future)
for future in [
loop.run_in_executor(executor, process_object, url) for url in objects
]
]
results = loop.run_until_complete(asyncio.gather(*futures))
loop.close()
CodePudding user response:
I have modified and simplified your code I don't know why you are combining threadpool futures with asyncio, if you want to limit the number of workers you can use Semaphores in Asyncio
Below is the code without using concurrent futures and simplified code that works as i can't reproduce above error exactly in my local
Try this:
async def process_object(filename, pid=None):
start = time.time()
s3 = S3Client(...)
opensearch_client = OpenSearchClient(...)
Parser.DEBUG = True
parser = Parser(s3, opensearch_client)
save_file = ...
s3.download_from_s3(filename, save_file)
parser.current_prefix = filename
await parser.parse(save_file)
print(f"Processed {filename} in {time.time() - start} seconds")
async def process_objects_bg(objects):
resp = await asyncio.gather(*[process_object(url) for url in objects])
if "__main__" == __name__:
objects = get_objects(top_n=3) # list of prefixes for S3
asyncio.run(process_objects_bg(objects))