Home > Enterprise >  How to avoid blocking the asyncio event loop with looping functions
How to avoid blocking the asyncio event loop with looping functions

Time:04-06

I'm using FastAPI with WebSockets to "push" SVGs to the client. The problem is: If iterations run continuously, they block the async event loop and the socket therefore can't listen to other messages.

Running the loop as a background task is not suitable because each iteration is CPU heavy and the data must be returned to the client.

Is there a different approach, or will I need to trigger each step from the client? I thought multiprocessing could work but not sure how this would work with asynchronous code like await websocket.send_text().

My first S/O question, so any help appreciated!

@app.websocket("/ws")
async def read_websocket(websocket: WebSocket) -> None:
    await websocket.accept()
    while True:
        data = await websocket.receive_text()

        async def run_continuous_iterations():
            #needed to run the steps until the user sends "stop"
            while True:
                svg_string = get_step_data()
                await websocket.send_text(svg_string) 

        if data == "status":
            await run_continuous_iterations()
        #this code can't run if the event loop is blocked by run_continuous_iterations
        if data == "stop":
            is_running = False
            print("Stopping process")

CodePudding user response:

"...each iteration is CPU heavy and the data must be returned to the client".

As described in this answer, a "coroutine suspends its execution only when it explicitly requests to be suspended", e.g., if there is an await call to I/O-bound operations, such as the ones described here. This, however, does not apply to CPU-bound operations, such as the ones mentioned here. Thus, CPU-bound operations, even if they are declared in async def functions and called using await, will block the event loop; and hence, any other requests will get blocked.

Additionally, from the code snippet your provided, you seem that you would like to be sending data back to the client, while at the same time listening to new messages (in order to check if the client sent a "stop" msg to stop the process). Thus, awaiting for an operation to be completed is not the way to go, but rather starting a thread/process to execute that task. Solutions below.

Using asyncio's run_in_executor:

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    is_running = True
    await websocket.accept()
    
    try:
        while True:
            data = await websocket.receive_text()

            async def run_continuous_iterations():
                while is_running:
                    svg_string = get_step_data()
                    await websocket.send_text(svg_string)
                
            if data == "status":
                is_running = True
                loop = asyncio.get_running_loop()
                loop.run_in_executor(None, lambda: asyncio.run(run_continuous_iterations()))

            if data == "stop":
                is_running = False
                print("Stopping process")
                
    except WebSocketDisconnect:
        is_running = False
        print("Client disconnected")  

Using threading's Thread:

#...  rest of the code is the same as above
                
if data == "status":
    is_running = True
    thread = threading.Thread(target=lambda: asyncio.run(run_continuous_iterations()))
    thread.start()

#...  rest of the code is the same as above
  • Related