I have a class inside a microservice that looks like this:
import asyncio
import threading
class A:
def __init__(self):
self.state = []
self._flush_thread = self._start_flush()
self.tasks = set()
def _start_flush(self):
threading.Thread(target=self._submit_flush).start()
def _submit_flush(self):
self._thread_loop = asyncio.new_event_loop()
self._thread_loop.run_until_complete(self.flush_state()) #
async def regular_func(self):
# This function is called on an event loop that is managed by asyncio.run()
# process self.state, fire and forget next func
task = asyncio.create_task(B.process_inputs(self.state)) # Should call process_inputs in the main thread event loop
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
pass
async def flush_state(self):
# flush out self.state at regular intervals, to next func
while True:
# flush state
asyncio.run_coroutine_threadsafe(B.process_inputs(self.state), self._thread_loop) # Calls process_inputs in the new thread event loop
await asyncio.sleep(10)
pass
class B:
@staticmethod
async def process_inputs(self, inputs):
# process
On these two threads, I have two separate event loops to avoid any other async functions in the main event loop from blocking other asyncio functions from running.
I see that asyncio.run_coroutine_threadsafe
is thread safe when submitting to a given event loop. Is asyncio.run_coroutine_threadsafe(B.process_inputs())
called between different event loops still threadsafe?
Edit:
process_inputs
uploads the state to an object store and calls an external API using the state we passed in.
CodePudding user response:
The answer here is that asyncio.run_coroutine_threadsafe
does not protect us from any thread safety issues across different event loops. We need to implement locks to protect any shared states while they are being modified. Credits to @Paul Cornelius for the reply.