I have created a greatly simplified version of an application below that intends to use Python's asyncio and threading modules. The general structure is as follows:
import asyncio
import threading
class Node:
def __init__(self, loop):
self.loop = loop
self.tasks = set()
async def computation(self, x):
print("Node: computation called with input ", x)
await asyncio.sleep(1)
def schedule_computation(self, x):
print("Node: schedule_computation called with input ", x)
task = self.loop.create_task(self.computation(x))
self.tasks.add(task)
class Router:
def __init__(self, loop):
self.loop = loop
self.nodes = {}
def register_node(self, id):
self.nodes[id] = Node(self.loop)
def schedule_computation(self, node_id, x):
print("Router: schedule_computation called with input ", x)
self.nodes[node_id].schedule_computation(x)
class Client:
def __init__(self, router):
self.router = router
self.counter = 0
def run(self):
while True:
if self.counter == 1000000:
self.router.schedule_computation(1, 5)
self.counter = 1
def main():
loop = asyncio.get_event_loop()
# construct Router instance and register a node
router = Router(loop)
router.register_node(1)
# construct Client instance
client = Client(router)
client_thread = threading.Thread(target=client.run)
client_thread.start()
loop.run_forever()
main()
In practice the Node.computation method is doing some network I/O and thus I'd like to perform said work asynchronously. The Client.run method is synchronous and blocking and I'd like to give this function it's own thread to execute in (in fact I'd like the ability to run this method in a separate process if possible).
Upon executing this application we get the following output:
Router: schedule_computation called with input 5
Node: schedule_computation called with input 5
However, I expect that "Node: computation called with input 5" should print as well because the Node.schedule_computation method creates a task to run on loop. In summary, why does it seem that Node.computation is never scheduled?
CodePudding user response:
Use loop.call_soon_threadsafe
In general, asyncio isn't thread safe
Almost all asyncio objects are not thread safe, which is typically not a problem unless there is code that works with them from outside of a Task or a callback. If there’s a need for such code to call a low-level asyncio API, the loop.call_soon_threadsafe() method should be used
https://docs.python.org/3/library/asyncio-dev.html#concurrency-and-multithreading
SCHEDULE COMPUTATION
loop.call_soon_threadsafe(self.nodes[node_id].schedule_computation,x)
Node.computation
runs on main thread
Not sure if you are aware, but even though you can use call_soon_threadsafe
to initiate a coroutine from another thread. The coroutine always runs in the thread the loop was created in. If you want to run coroutines on another thread, then your background thread will need its own EventLoop
also.