Home > other >  Akka streams websocket stream things to a Sink.seq ends with exception SubscriptionWithCancelExcepti
Akka streams websocket stream things to a Sink.seq ends with exception SubscriptionWithCancelExcepti

Time:12-21

I'm failing to materialize the Sink.seq, when it comes time to materialize I fail with this exception

akka.stream.SubscriptionWithCancelException$StageWasCompleted$:

Here is the full source code on github: https://github.com/Christewart/bitcoin-s-core/blob/aaecc7c180e5cc36ec46d73d6b2b0b0da87ab260/app/server-test/src/test/scala/org/bitcoins/server/WebsocketTests.scala#L51

I'm attempting to aggregate all elements being pushed out of a websocket into a Sink.seq. I have to a bit of json transformation before I aggreate things inside of Sink.seq.


      
  val endSink: Sink[WalletNotification[_], Future[Seq[WalletNotification[_]]]] =
    Sink.seq[WalletNotification[_]]

  val sink: Sink[Message, Future[Seq[WalletNotification[_]]]] = Flow[Message]
    .map {
      case message: TextMessage.Strict =>
        //we should be able to parse the address message
        val text = message.text
        val notification: WalletNotification[_] = {
          upickle.default.read[WalletNotification[_]](text)(
            WsPicklers.walletNotificationPickler)
        }
        logger.info(s"Notification=$notification")
        notification
      case msg =>
        logger.error(s"msg=$msg")
        sys.error("")
    }
    .log(s"@@@ endSink @@@")
    .toMat(endSink)(Keep.right)

      val f: Flow[
        Message,
        Message,
        (Future[Seq[WalletNotification[_]]], Promise[Option[Message]])] = {
        Flow
          .fromSinkAndSourceMat(sink, Source.maybe[Message])(Keep.both)
      }

      val tuple: (
          Future[WebSocketUpgradeResponse],
          (Future[Seq[WalletNotification[_]]], Promise[Option[Message]])) = {
        Http()
          .singleWebSocketRequest(req, f)
      }

      val walletNotificationsF: Future[Seq[WalletNotification[_]]] =
        tuple._2._1

      val promise: Promise[Option[Message]] = tuple._2._2
      logger.info(s"Requesting new address for expectedAddrStr")
      val expectedAddressStr = ConsoleCli
        .exec(CliCommand.GetNewAddress(labelOpt = None), cliConfig)
        .get
      val expectedAddress = BitcoinAddress.fromString(expectedAddressStr)

       
      promise.success(None)
      logger.info(s"before notificationsF")

      //hangs here, as the future never gets completed, fails with an exception
      for {
        notifications <- walletNotificationsF
        _ = logger.info(s"after notificationsF")
      } yield {
        //assertions in here...
      }

What am i doing wrong?

CodePudding user response:

To keep the client connection open you need "more code", sth like this:

val sourceKickOff = Source
  .single(TextMessage("kick off msg"))
  // Keeps the connection open
  .concatMat(Source.maybe[Message])(Keep.right)

See full working example, which consumes msgs from a server: https://github.com/pbernet/akka_streams_tutorial/blob/b6d4c89a14bdc5d72c557d8cede59985ca8e525f/src/main/scala/akkahttp/WebsocketEcho.scala#L280

CodePudding user response:

The problem is this line

Flow.fromSinkAndSourceMat(sink, Source.maybe[Message])(Keep.both)

it needs to be

Flow.fromSinkAndSourceCoupledMat(sink, Source.maybe[Message])(Keep.both)

When the stream is terminated, the Coupled part of the materialized flow will make sure to terminate the Sink downstream.

  • Related