Home > Blockchain >  How do I send multiple messages as my source
How do I send multiple messages as my source

Time:10-15

I'm just trying out this sample stream that currently has a single TextMessage as a source:

 // print each incoming strict text message
    val printSink: Sink[Message, Future[Done]] =
      Sink.foreach {
        case message: TextMessage.Strict =>
          println(message.text)
        case _ =>
        // ignore other message types
      }

    val helloSource: Source[Message, NotUsed] =
      Source.single(TextMessage("hello world!"))

    // the Future[Done] is the materialized value of Sink.foreach
    // and it is completed when the stream completes
    val flow: Flow[Message, Message, Future[Done]] =
      Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)

I want to send 2 messages, so I tried this:

val source1 = Source.single(TextMessage("hello"))
val source2 = Source.single(TextMessage("world"))

val helloSource: Source[Message, NotUsed] =
  Source.combine(source2)

But I get this error:

 polymorphic expression cannot be instantiated to expected type;
[error]  found   : [U](strategy: Int => akka.stream.Graph[akka.stream.UniformFanInShape[akka.http.scaladsl.model.ws.TextMessage.Strict,U],akka.NotUsed]): akka.stream.scaladsl.Source[U,akka.NotUsed]
[error]  required: akka.stream.scaladsl.Source[akka.http.scaladsl.model.ws.Message,akka.NotUsed]
[error]       Source.combine(source1, source2)
[error]                     ^
[error] one error found

What exactly should I be doing instead?

CodePudding user response:

Source.combine is a flexible way to combine multiple sources, and you need to specify a strategy for combining them, as described in the linked documentation.

In this case, where you want to have one finite source be followed by another, you can use the Concat strategy.

val helloSource: Source[Message, NotUsed] =
  Source.combine(source1, source2)(Concat(_))

As a simpler alternative, you can use the concat method on the first source:

val helloSource: Source[Message, NotUsed] =
  source1.concat(source2)

However, for this example, where you have a fixed set of hardcoded elements, it's even simpler to avoid the creation of multiple sources and create only a single source from an Iterable with Source.apply:

val helloSource: Source[Message, NotUsed] =
  Source(Seq(TextMessage("hello"), TextMessage("world")))
  • Related