Home > Back-end >  haskell TCP Server which sends an incoming message to all connections
haskell TCP Server which sends an incoming message to all connections

Time:08-31

What is the best way to write a tcp server in haskell which mantains a pool of all open connections and sends all the messages it receives to all connections?

basically a translation of this nodejs code

import net from "net";

const conns = [];

net.createServer((socket) => {
    conns.push(socket);

    socket.on("data", (data) => {
        for (const conn of conns) {
            conn.write(data);
        }
    });

    socket.on("close", () => {
        conns.splice(conns.indexOf(socket), 1);
    });
}).listen(1337);

CodePudding user response:

It's always hard to say what the "best" way is. Here's one way:

import Control.Concurrent
import Control.Concurrent.Async  -- from async
import Control.Monad
import Control.Monad.Loops       -- from monad-loops
import Network.Simple.TCP        -- from network-simple

chunk = 4096

main = do
  bcast <- newChan
  forkIO $ forever $ readChan bcast
  serve (Host "127.0.0.1") "8080" $ \(s, addr) -> do
    bcast' <- dupChan bcast
    race_ (forever $ readChan bcast' >>= send s)
          (whileJust_ (recv s 4096) $ writeChan bcast')

In more detail, Control.Concurrent has channels that support simple broadcast operation via a dupChan operation. Here, we create a master broadcast channel:

bcast <- newChan

This master channel and any of its duplicates created via dupChan can be written to, and the resulting data will be available on all duplicated copies.

Because we will use the master channel only for making duplicates and not for reading and writing, we'll need to fork a thread to drain it so data doesn't accumulate:

forkIO $ forever $ readChan bcast

Now, we use serve to serve clients. For each client that connects, the do-block is run:

serve (Host "127.0.0.1") "8080" $ \(s, addr) -> do ...

In the do-block, we create our per-client duplicate of the master channel:

bcast' <- dupChan bcast

Then, we fork two threads, one that reads any data broadcast on the bcast' channel and sends it out to the socket, and another that reads input from the socket and broadcasts it. We race_ these threads so that if either one completes, the other will be killed:

race_ (forever $ readChan bcast' >>= send s)
      (whileJust_ (recv s 4096) $ writeChan bcast')

Usually, if a client disconnects, recv will return Nothing and the second thread will end. This will cause race_ to kill the first.

There is a small race condition however. If a client disconnects and a broadcast is sent and processed by the first thread before the second thread returns and race_ kills the first, send will raise an exception. This will cause race_ to kill the second thread (as desired) but then re-raise the exception, resulting in an ugly error message on the console.

You can replace race_ with raceQuietly_ defined as follows if you want to avoid this:

import Control.Exception.Safe

raceQuietly_ x y = race_ x y `catchIO` (\_ -> return ())
  • Related