Home > OS >  Python Asyncio and Multithreading
Python Asyncio and Multithreading

Time:09-07

I have created a greatly simplified version of an application below that intends to use Python's asyncio and threading modules. The general structure is as follows:

import asyncio
import threading

class Node:
    def __init__(self, loop):
        self.loop = loop
        self.tasks = set()

    async def computation(self, x):
        print("Node: computation called with input ", x)
        await asyncio.sleep(1)

    def schedule_computation(self, x):
        print("Node: schedule_computation called with input ", x)
        task = self.loop.create_task(self.computation(x))
        self.tasks.add(task)

class Router:
    def __init__(self, loop):
        self.loop = loop
        self.nodes = {}

    def register_node(self, id):
        self.nodes[id] = Node(self.loop)

    def schedule_computation(self, node_id, x):
        print("Router: schedule_computation called with input ", x)
        self.nodes[node_id].schedule_computation(x)

class Client:
    def __init__(self, router):
        self.router = router
        self.counter = 0

    def run(self):
        while True:
            if self.counter == 1000000:
                self.router.schedule_computation(1, 5)

            self.counter  = 1
    
def main():
    loop = asyncio.get_event_loop()

    # construct Router instance and register a node
    router = Router(loop)
    router.register_node(1)

    # construct Client instance
    client = Client(router)

    client_thread = threading.Thread(target=client.run)
    client_thread.start()

    loop.run_forever()

main()

In practice the Node.computation method is doing some network I/O and thus I'd like to perform said work asynchronously. The Client.run method is synchronous and blocking and I'd like to give this function it's own thread to execute in (in fact I'd like the ability to run this method in a separate process if possible).

Upon executing this application we get the following output:

Router: schedule_computation called with input  5
Node: schedule_computation called with input  5

However, I expect that "Node: computation called with input 5" should print as well because the Node.schedule_computation method creates a task to run on loop. In summary, why does it seem that Node.computation is never scheduled?

CodePudding user response:

Use loop.call_soon_threadsafe

In general, asyncio isn't thread safe

Almost all asyncio objects are not thread safe, which is typically not a problem unless there is code that works with them from outside of a Task or a callback. If there’s a need for such code to call a low-level asyncio API, the loop.call_soon_threadsafe() method should be used

https://docs.python.org/3/library/asyncio-dev.html#concurrency-and-multithreading

SCHEDULE COMPUTATION

loop.call_soon_threadsafe(self.nodes[node_id].schedule_computation,x)

Node.computation runs on main thread

Not sure if you are aware, but even though you can use call_soon_threadsafe to initiate a coroutine from another thread. The coroutine always runs in the thread the loop was created in. If you want to run coroutines on another thread, then your background thread will need its own EventLoop also.

  • Related