Home > Back-end >  Create another thread for an await function
Create another thread for an await function

Time:10-26

I'm working with Webserver for the first time, I had worked with socket and parallelism before but it was very different and simple, it didn't use Async as parallelism.

My goal is simple, I have my server and my client. In my client I want to create a separate thread to receive the messages that the server will send and in the previous thread do some other things, as in the code example (client.py):

from typing import Dict
import websockets
import asyncio
import json

URL = "my localhost webserver"
connection = None

async def listen() -> None:
    global connection

    input("Press enter to connect.")
    
    async with websockets.connect(URL) as ws:
        connection = ws

        msg_initial: Dict[str,str] = get_dict()
        await ws.send(json.dumps(msg_initial))
        

        ## This i want to be in another thread
await receive_msg()

print("I`m at listener`s thread")

# do some stuffs

async def recieve_msg() -> None:
    while True:
        msg = await connection.recv()
        print(f"Server: {msg}")

asyncio.get_event_loop().run_until_complete(listen())

For me to get a message I need to use await in recv() but I don't know how to create a separate thread for that. I've already tried using threading to create a separate thread but it didn't work.

Does anyone know how to do this and if it is possible to do this?

CodePudding user response:

It's not clear what you want to do can be done in the exact way you propose. In the following example I am connecting to an echo server. The most straightforward way of implementing what you are suggesting directly is to create a new thread to which the connection is passed. But this does not quite work:

import websockets
import asyncio
from threading import Thread

URL = "ws://localhost:4000"

async def listen() -> None:
    async with websockets.connect(URL) as ws:
        # pass connection:
        t = Thread(target=receiver_thread, args=(ws,))
        t.start()
        # Generate some messages to be echoed back:
        await ws.send('msg1')
        await ws.send('msg2')
        await ws.send('msg3')
        await ws.send('msg4')
        await ws.send('msg5')

def receiver_thread(connection):
    print("I`m at listener`s thread")
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(receive_msg(connection))

async def receive_msg(connection) -> None:
    while True:
        msg = await connection.recv()
        print(f"Server: {msg}")

asyncio.get_event_loop().run_until_complete(listen())

Prints:

I`m at listener`s thread
Server: msg1
Server: msg2
Server: msg3
Server: msg4
Server: msg5
Exception in thread Thread-1:
Traceback (most recent call last):
  File "C:\Program Files\Python38\lib\threading.py", line 932, in _bootstrap_inner
    self.run()
  File "C:\Program Files\Python38\lib\threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Ron\test\test.py", line 22, in receiver_thread
    loop.run_until_complete(receive_msg(connection))
  File "C:\Program Files\Python38\lib\asyncio\base_events.py", line 616, in run_until_complete
    return future.result()
  File "C:\Ron\test\test.py", line 29, in receive_msg
    msg = await connection.recv()
  File "C:\Program Files\Python38\lib\site-packages\websockets\legacy\protocol.py", line 404, in recv
    await asyncio.wait(
  File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 424, in wait
    fs = {ensure_future(f, loop=loop) for f in set(fs)}
  File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 424, in <setcomp>
    fs = {ensure_future(f, loop=loop) for f in set(fs)}
  File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 667, in ensure_future
    raise ValueError('The future belongs to a different loop than '
ValueError: The future belongs to a different loop than the one specified as the loop argument

The messages are received okay but the problem occurs in function receiver_thread on the statement:

loop.run_until_complete(receive_msg(connection))

By necessity the started thread has no running event loop and cannot use the event loop being used by function listen and so must create a new event loop. That would be fine if this thread/event loop were not using any resources (i.e. the connection) from a difference event loop:

import websockets
import asyncio
from threading import Thread

URL = "ws://localhost:4000"

async def listen() -> None:
    async with websockets.connect(URL) as ws:
        t = Thread(target=receiver_thread)
        t.start()

def receiver_thread():
    print("I`m at listener`s thread")
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(receive_msg())

async def receive_msg() -> None:
    await asyncio.sleep(2)
    print('I just slept for 2 seconds')

asyncio.get_event_loop().run_until_complete(listen())

Prints:

I`m at listener`s thread
I just slept for 2 seconds

I can see no real need to be running anything in threads based on the minimal code you have showed but assuming you omitted showing some processing of the received message for which asyncio alone is not sufficient, then perhaps all you need to do is receive the messages in the current running loop (in function listen) and use threading just for the processing of the message:

from typing import Dict
import websockets
import asyncio
import json
from threading import Thread

URL = "my localhost webserver"

async def listen() -> None:

    input("Press enter to connect.")

    async with websockets.connect(URL) as ws:

        msg_initial: Dict[str,str] = get_dict()
        await ws.send(json.dumps(msg_initial))

        while True:
            msg = await ws.recv()
            print(f"Server: {msg}")
            # Non-daemon threads so program will not end until these threads terminate:
            t = Thread(target=process_msg, args=(msg,))
            t.start()
            

asyncio.get_event_loop().run_until_complete(listen())

Update

Based on your last comment to my answer concerning creating a chat program, you should either implement this using pure multithreading or pure asyncio. Here is a rough outline using asyncio:

import websockets
import asyncio
import aioconsole

URL = "my localhost webserver"

async def receiver(connection):
    while True:
        msg = await connection.recv()
        print(f"\nServer: {msg}")

async def sender(connection):
    while True:
        msg = await aioconsole.ainput('\nEnter msg: ')
        await connection.send(msg)

async def chat() -> None:
    async with websockets.connect(URL) as ws:
        await asyncio.gather(
            receiver(ws),
            sender(ws)
        )

asyncio.get_event_loop().run_until_complete(chat())

However, you may be limited in the type of user input you can do with asyncio. I would think, therefore, that multithreading might be a better approach.

  • Related