I connected to my websocket service using this sample code client, but currently it just connects and then shutsdown.
How can I keep this connection open and never close it?
Once I make a connection, I want it to remain open until I shutdown the application.
package docs.http.scaladsl
import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import scala.concurrent.Future
object WebSocketClientFlow {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
import system.dispatcher
// Future[Done] is the materialized value of Sink.foreach,
// emitted when the stream completes
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
// ignore other message types
}
// send this as a message over the WebSocket
val outgoing = Source.single(TextMessage("hello world!"))
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
}
Update I have the same code as above, except I updated my sources to look like this:
val source1 = Source.single(TextMessage("""{"action":"auth","params":"APIKEY_123"}"""))
val source2 = Source.single(TextMessage("""{"action":"subscribe","params":"topic123"}"""))
val sources: Source[Message, NotUsed] =
Source.combine(source1, source2, Source.maybe)(Concat(_))
So I can see my source1, and source2 are sent to the websocket, but the websocket does not start streaming its values as it should, it just hangs.
Not sure what I am doing wrong...
CodePudding user response:
The Akka docs call out your situation:
The Akka HTTP WebSocket API does not support half-closed connections which means that if either stream completes the entire connection is closed (after a “Closing Handshake” has been exchanged or a timeout of 3 seconds has passed).
In your case, outgoing
(being a Source.single
) completes as soon as it has emitted the TextMessage
. The webSocketFlow
receives the completion message and then tears down the connection.
The solution is to delay when outgoing
completes, perhaps even delaying it forever (or at least until the application is killed).
Two standard sources are potentially useful for delaying completion in the scenario where you don't want to send messages through the websocket.
Source.maybe
materializes as aPromise
which you can complete with an optional terminating message. It will not complete unless and until the promise is completed.Source.never
never completes. You could achieve this by just not completingSource.maybe
, but this is less overhead than that.
So what would it look like in code?
val outgoing =
Source.single(TextMessage("hello world!"))
.concat(Source.never)
For Source.maybe
, you'll want .concatMat
so that the Promise
is available for completion; this does mean that you'll get something like val (completionPromise, upgradeResponse, closed)
as the overall materialized value:
val outgoing =
Source.single(TextMessage("hello world!"))
.concatMat(Source.maybe[TextMessage])(Keep.right)
val ((completionPromise, upgradeResponse), closed) =
outgoing
.viaMat(websocketFlow)(Keep.both)
.toMat(incoming)(Keep.both)
.run()
In the situation where you want to send arbitrarily many messages through the socket, Source.actorRef
or Source.queue
are handy: send messages to the materialized actor ref to send them through the websocket connection (sending a special message to complete the source) or offer
messages to the queue and then complete
it.
val outgoing =
Source.actorRef[TextMessage](
completionMatcher = {
case Done =>
CompletionStrategy.draining // send the messages already sent before completing
},
failureMatcher = PartialFunction.empty,
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropNew
)
val ((sendToSocketRef, upgradeResponse), closed) =
outgoing
.viaMat(websocketFlow)(Keep.both)
.toMat(incoming)(Keep.both)
.run()
sendToSocketRef ! TextMessage("hello world!")