Home > Software engineering >  Use a Actor as a source to my Websocket client flow
Use a Actor as a source to my Websocket client flow

Time:10-25

I currently have a simple TextMessage Source that sends messages to my Websocket client flow like this:

     val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
        case _ =>
      }

    // send this as a message over the WebSocket
    val outgoing: Source[TextMessage.Strict, NotUsed] = Source
      .combine(
        Source.single(
          TextMessage(
            """{"auth":"APIKEY-123"}"""
          )
        ),
        Source.single(
          TextMessage(
            """{"topic":"topic123"}"""
          )
        ),
        Source.never
      )(Merge(_))
      .throttle(1, 1.second)

    val webSocketFlow = Http().webSocketClientFlow(
      WebSocketRequest("wss://socket.polygon.io/stocks")
    )

    val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(
          Keep.right
        ) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(
          s"Connection failed: ${upgrade.response.status}"
        )
      }
    }

So I currently have a Source of type Source[TextMessage.Strict, NotUsed], but I want to use the commented out code where I have a ActorRef as my source.

I tried this:

  val actorSource: Source[Any, ActorRef] = Source.actorRef(
  completionMatcher = { case Done =>
    CompletionStrategy.immediately
  },
  failureMatcher = PartialFunction.empty,
  bufferSize = 100,
  overflowStrategy = OverflowStrategy.dropHead
)

val actorRef: ActorRef = actorSource.to(Sink.foreach(println)).run()
actorRef !  """{"auth":"APIKEY-123"}"""

val webSocketFlow = Http().webSocketClientFlow(
  WebSocketRequest("wss://socket.polygon.io/stocks")
)

val (upgradeResponse, closed) =
  actorSource
    .viaMat(webSocketFlow)(
      Keep.right
    ) // keep the materialized Future[WebSocketUpgradeResponse]
    .toMat(incoming)(Keep.both) // also keep the Future[Done]
    .run()

So when I am using a ActorRef as my source, I am having a hard time trying to fit this into the graph. I am getting this compile time error:

type mismatch; [error] found : akka.stream.scaladsl.Flow[akka.http.scaladsl.model.ws.Message,akka.http.scaladsl.model.ws.Message,scala.concurrent.Future[akka.http.scaladsl.model.ws.WebSocketUpgradeResponse]] [error] required: akka.stream.Graph[akka.stream.FlowShape[String,?],?] [error]
.viaMat(webSocketFlow)(

Note: I want a Actor as my source, and also as my sink i.e. pass all messages that result from the flow to another actor as a sink.

Can someone explain what am I currently doing wrong with my Actor as a source and trying to add it to my flow/graph?

Update

Here is the code I have now:

def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    import system.dispatcher

    val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
        case _ =>
        // ignore other message types
      }

    val actorSource = Source.actorRef[String](
      completionMatcher = { case Done =>
        CompletionStrategy.immediately
      },
      failureMatcher = PartialFunction.empty,
      bufferSize = 100,
      overflowStrategy = OverflowStrategy.dropHead
    )

    val webSocketFlow = Http().webSocketClientFlow(
      WebSocketRequest("wss://socket.polygon.io/stocks")
    )

    // the materialized value is a tuple with
    // upgradeResponse is a Future[WebSocketUpgradeResponse] that
    // completes or fails when the connection succeeds or fails
    // and closed is a Future[Done] with the stream completion from the incoming sink
    val ((sendActor, upgradeResponse), closed) =
      actorSource
        .viaMat(webSocketFlow)(
          Keep.both
        ) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

    // just like a regular http request we can access response status which is available via upgrade.response.status
    // status code 101 (Switching Protocols) indicates that server support WebSockets
    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(
          s"Connection failed: ${upgrade.response.status}"
        )
      }
    }

    sendActor ! TextMessage("""{"auth":"APIKEY-123"}""")
    sendActor ! TextMessage("""{"topic":"topic123"}""")

    //in a real application you would not side effect here
    connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }

I am getting the following compile errors:

[error] The argument types of an anonymous function must be fully known. (SLS 8.5) [error] Expected type was: ? [error]
completionMatcher = { case Done => [error] ^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:57:37: value flatMap is not a member of Any [error] val connected = upgradeResponse.flatMap { upgrade => [error]
^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:67:15: value ! is not a member of Any [error] sendActor ! TextMessage("""{"auth":"APIKEY-123"}""") [error] ^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:68:15: value ! is not a member of Any [error] sendActor ! TextMessage("""{"topic":"topic123"}""") [error] ^ [error] /home/blank/scala/testing/streamsapp/ws2.scala:72:12: value foreach is not a member of Any [error] closed.foreach(_ => println("closed")) [error] ^ [error] 5 errors found

CodePudding user response:

Your compiler error is arising from your actorSource not outputting Message but String (that error shouldn't be the one you'd get with your code, perhaps you tried changing it to a Source[String, ActorRef]?): since the webSocketFlow only processes Messages, it can only be attached to a source of Message.

So I suggest something along the lines of:

val immediateCompletion: PartialFunction[Any, CompletionStrategy] = {
  case Done => CompletionStrategy.immediately
}

val actorSource = Source.actorRef[Message](
  completionMatcher = immediateCompletion,
  failureMatcher = PartialFunction.empty,
  bufferSize = 100,
  overflowStrategy = OverflowStrategy.dropHead
)

val webSocketFlow = Http().webSocketClientFlow(
  WebSocketRequest("wss://socket.polygon.io/stocks")
)

val ((sendActor, upgradeResponse), closed) =
  actorSource
    .viaMat(webSocketFlow)(Keep.both)  // keep both the actor and the upgradeResponse
    .toMat(incoming)(Keep.both)  // ...and also keep the closed
    .run()

sendActor ! TextMessage("""{"auth":"APIKEY-123"}""")
sendActor ! TextMessage("""{"topic":"topic123"}""")
  • Related