I'm just trying out this sample stream that currently has a single TextMessage as a source:
// print each incoming strict text message
val printSink: Sink[Message, Future[Done]] =
Sink.foreach {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
// ignore other message types
}
val helloSource: Source[Message, NotUsed] =
Source.single(TextMessage("hello world!"))
// the Future[Done] is the materialized value of Sink.foreach
// and it is completed when the stream completes
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
I want to send 2 messages, so I tried this:
val source1 = Source.single(TextMessage("hello"))
val source2 = Source.single(TextMessage("world"))
val helloSource: Source[Message, NotUsed] =
Source.combine(source2)
But I get this error:
polymorphic expression cannot be instantiated to expected type;
[error] found : [U](strategy: Int => akka.stream.Graph[akka.stream.UniformFanInShape[akka.http.scaladsl.model.ws.TextMessage.Strict,U],akka.NotUsed]): akka.stream.scaladsl.Source[U,akka.NotUsed]
[error] required: akka.stream.scaladsl.Source[akka.http.scaladsl.model.ws.Message,akka.NotUsed]
[error] Source.combine(source1, source2)
[error] ^
[error] one error found
What exactly should I be doing instead?
CodePudding user response:
Source.combine
is a flexible way to combine multiple sources, and you need to specify a strategy for combining them, as described in the linked documentation.
In this case, where you want to have one finite source be followed by another, you can use the Concat
strategy.
val helloSource: Source[Message, NotUsed] =
Source.combine(source1, source2)(Concat(_))
As a simpler alternative, you can use the concat
method on the first source:
val helloSource: Source[Message, NotUsed] =
source1.concat(source2)
However, for this example, where you have a fixed set of hardcoded elements, it's even simpler to avoid the creation of multiple sources and create only a single source from an Iterable
with Source.apply
:
val helloSource: Source[Message, NotUsed] =
Source(Seq(TextMessage("hello"), TextMessage("world")))