What is the best way to write a tcp server in haskell which mantains a pool of all open connections and sends all the messages it receives to all connections?
basically a translation of this nodejs code
import net from "net";
const conns = [];
net.createServer((socket) => {
conns.push(socket);
socket.on("data", (data) => {
for (const conn of conns) {
conn.write(data);
}
});
socket.on("close", () => {
conns.splice(conns.indexOf(socket), 1);
});
}).listen(1337);
CodePudding user response:
It's always hard to say what the "best" way is. Here's one way:
import Control.Concurrent
import Control.Concurrent.Async -- from async
import Control.Monad
import Control.Monad.Loops -- from monad-loops
import Network.Simple.TCP -- from network-simple
chunk = 4096
main = do
bcast <- newChan
forkIO $ forever $ readChan bcast
serve (Host "127.0.0.1") "8080" $ \(s, addr) -> do
bcast' <- dupChan bcast
race_ (forever $ readChan bcast' >>= send s)
(whileJust_ (recv s 4096) $ writeChan bcast')
In more detail, Control.Concurrent
has channels that support simple broadcast operation via a dupChan
operation. Here, we create a master broadcast channel:
bcast <- newChan
This master channel and any of its duplicates created via dupChan
can be written to, and the resulting data will be available on all duplicated copies.
Because we will use the master channel only for making duplicates and not for reading and writing, we'll need to fork a thread to drain it so data doesn't accumulate:
forkIO $ forever $ readChan bcast
Now, we use serve
to serve clients. For each client that connects, the do-block is run:
serve (Host "127.0.0.1") "8080" $ \(s, addr) -> do ...
In the do-block, we create our per-client duplicate of the master channel:
bcast' <- dupChan bcast
Then, we fork two threads, one that reads any data broadcast on the bcast'
channel and sends it out to the socket, and another that reads input from the socket and broadcasts it. We race_
these threads so that if either one completes, the other will be killed:
race_ (forever $ readChan bcast' >>= send s)
(whileJust_ (recv s 4096) $ writeChan bcast')
Usually, if a client disconnects, recv
will return Nothing
and the second thread will end. This will cause race_
to kill the first.
There is a small race condition however. If a client disconnects and a broadcast is sent and processed by the first thread before the second thread returns and race_
kills the first, send
will raise an exception. This will cause race_
to kill the second thread (as desired) but then re-raise the exception, resulting in an ugly error message on the console.
You can replace race_
with raceQuietly_
defined as follows if you want to avoid this:
import Control.Exception.Safe
raceQuietly_ x y = race_ x y `catchIO` (\_ -> return ())