Home > OS >  Concatinating two Flows in Akka stream
Concatinating two Flows in Akka stream

Time:10-28

I am trying to concat two Flows and I am not able to explain the output of my implementation.

val source = Source(1 to 10)
val sink = Sink.foreach(println)

val flow1 = Flow[Int].map(s => s   1)
val flow2 = Flow[Int].map(s => s * 10)

val flowGraph = Flow.fromGraph(
    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val concat = builder.add(Concat[Int](2))
      val broadcast = builder.add(Broadcast[Int](2))

      broadcast ~> flow1 ~> concat.in(0)
      broadcast ~> flow2 ~> concat.in(1)

      FlowShape(broadcast.in, concat.out)
    }
  )

source.via(flowGraph).runWith(sink)

I expect the following output from this code.

2
3
4
.
.
.
11
10
20
.
.
.
100

Instead, I see only "2" being printed. Can you please explain what is wrong in my implmentation and how should I change the program to get the desired output.

CodePudding user response:

From Akka Stream's API docs:

Concat:

Emits when the current stream has an element available; if the current input completes, it tries the next one

Broadcast:

Emits when all of the outputs stops backpressuring and there is an input element available

The two operators won't work in conjunction as there is a conflict in how they work -- Concat tries to pull all elements from one of Broadcast's outputs before switching to the other one, whereas Broadcast won't emit unless there is demand for ALL of its outputs.

For what you need, you could concatenate using concat as suggested by commenters:

source.via(flow1).concat(source.via(flow2)).runWith(sink)

or equivalently, use Source.combine like below:

Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)

CodePudding user response:

Using GraphDSL, which is a simplified version of the implementation of Source.combine:

val sg = Source.fromGraph(
  GraphDSL.create(){ implicit builder =>
    import GraphDSL.Implicits._

    val concat = builder.add(Concat[Int](2))

    source ~> flow1 ~> concat
    source ~> flow2 ~> concat

    SourceShape(concat.out)
  }
)

sg.runWith(sink)
  • Related