Home > Enterprise >  Keep Websockets connection open for incoming requests
Keep Websockets connection open for incoming requests

Time:10-07

I have a Flask server that accepts HTTP requests from a client. This HTTP server needs to delegate work to a third-party server using a websocket connection (for performance reasons).

I find it hard to wrap my head around how to create a permanent websocket connection that can stay open for HTTP requests. Sending requests to the websocket server in a run-once script works fine and looks like this:

async def send(websocket, payload):
    await websocket.send(json.dumps(payload).encode("utf-8"))

async def recv(websocket):
    data = await websocket.recv()
    return json.loads(data)

async def main(payload):
    uri = f"wss://the-third-party-server.com/xyz"
    async with websockets.connect(uri) as websocket:
        future = send(websocket, payload)
        future_r = recv(websocket)
        _, output = await asyncio.gather(future, future_r)
    return output

asyncio.get_event_loop().run_until_complete(main({...}))

Here, main() establishes a WSS connection and closes it when done, but how can I keep that connection open for incoming HTTP requests, such that I can call main() for each of those without re-establising the WSS connection?

CodePudding user response:

The main problem there is that when you code a web app responding http(s), your code have a "life cycle" that is very peculiar to that: usually you have a "view" function that will get the request data, perform all actions needed to gather the response data and return it.

This "view" function in most web frameworks has to be independent from the rest of the system - it should be able to perform its duty relying on no other data or objects than what it gets when called - which are the request data, and system configurations - that gives the application server (the framework parts designed to actually connect your program to the internet) can choose a variety of ways to serve your program: they may run your view function in several parallel threads, or in several parallel processes, or even in different processes in various containers or physical servers: you application would not need to care about that.

If you want a resource that is available across calls to your view functions, you need to break out of this paradigm. For example, typically, frameworks will want to create a pool of database connections, so that views on the same process can re-use those connections. These database connections are usually supplied by the framework itself, which implements a mechanism for allowing then to be reused, and be available in a transparent way, as needed. You have to recreate a mechanism of the same sort if you want to keep a websocket connection alive.

In a certain way, you need a Python object that can mediate your websocket data behaving like a "server" for your web view functions.

That is simpler to do than it sounds - a special Python class designed to have a single instance per process, which keeps the connections, and is able to send and receive data received from parallel calls without mangling it is enough. A callable that will ensure this instance exists in the current process is enough to work under any strategy configured to serve your app to the web.

If you are using Flask, which does not use asyncio, you get a further complication - you will loose the async-ability inside your views, they will have to wait for the websocket requisition to be completed - it will then be the job of your application server to have your view in different threads or processes to ensure availability. And, it is your job to have the asyncio loop for your websocket running in a separate thread, so that it can make the requests it needs.

Here is some example code. Please note that apart from using a single websocket per process, this has no provisions in case of failure of any kind, but, most important: it does nothing in parallel: all pairs of send-recv are blocking, as you give no clue of a mechanism that would allow one to pair each outgoing message with its response.

import asyncio
import threading
from queue import Queue
 

class AWebSocket:
    instance = None
    def __new__(cls, *args, **kw):
        if cls.instance:
            return cls.instance
        return super().__new__(cls, *args, **kw)

    def __init__(self, *args, **kw):
        cls = self.__class__
        if cls.instance:
            # init will be called even if new finds the existing instance,
            # so we have to check again
            return 
        self.outgoing = Queue()
        self.responses = Queue()
        self.socket_thread = threading.Thread(target=self.start_socket)
        self.socket_thread.start()


    def start_socket():
        # starts an async loop in a separate thread, and keep
        # the web socket running, in this separate thread
        asyncio.get_event_loop().run_until_complete(self.core())

    def core(self):
        self.socket = websockets.connect(uri)

    async def _send(self, websocket, payload):
        await websocket.send(json.dumps(payload).encode("utf-8"))

    async def _recv(self, websocket):
        data = await websocket.recv()
        return json.loads(data)

    async def core(self):
        uri = f"wss://the-third-party-server.com/xyz"
        async with websockets.connect(uri) as websocket:
            self.websocket = websocket
            while True:
                # This code is as you wrote it: 
                # it essentially blocks until a message is sent
                # and the answer is received back. 
                # You have to have a mechanism in your websocket
                # messages allowing you to identify the corresponding
                # answer to each request. On doing so, this is trivially
                # paralellizable simply by calling asyncio.create_task 
                # instead of awaiting on asyncio.gather
                payload = self.outgoing.get()
                future = self._send(websocket, payload)
                future_r = self._recv(websocket)
                _, response = await asyncio.gather(future, future_r)
                self.responses.put(response)

    def send(self, payload):
        # This is the method you call from your views
        # simply do:
        # `output = AWebSocket().send(payload)`
        self.outgoing.put(payload)
        return self.responses.get()

  • Related