I am trying to concat two Flows and I am not able to explain the output of my implementation.
val source = Source(1 to 10)
val sink = Sink.foreach(println)
val flow1 = Flow[Int].map(s => s 1)
val flow2 = Flow[Int].map(s => s * 10)
val flowGraph = Flow.fromGraph(
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val concat = builder.add(Concat[Int](2))
val broadcast = builder.add(Broadcast[Int](2))
broadcast ~> flow1 ~> concat.in(0)
broadcast ~> flow2 ~> concat.in(1)
FlowShape(broadcast.in, concat.out)
}
)
source.via(flowGraph).runWith(sink)
I expect the following output from this code.
2
3
4
.
.
.
11
10
20
.
.
.
100
Instead, I see only "2" being printed. Can you please explain what is wrong in my implmentation and how should I change the program to get the desired output.
CodePudding user response:
From Akka Stream's API docs:
Emits when the current stream has an element available; if the current input completes, it tries the next one
Emits when all of the outputs stops backpressuring and there is an input element available
The two operators won't work in conjunction as there is a conflict in how they work -- Concat
tries to pull all elements from one of Broadcast
's outputs before switching to the other one, whereas Broadcast
won't emit unless there is demand for ALL of its outputs.
For what you need, you could concatenate using concat
as suggested by commenters:
source.via(flow1).concat(source.via(flow2)).runWith(sink)
or equivalently, use Source.combine
like below:
Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)
CodePudding user response:
Using GraphDSL
, which is a simplified version of the implementation of Source.combine:
val sg = Source.fromGraph(
GraphDSL.create(){ implicit builder =>
import GraphDSL.Implicits._
val concat = builder.add(Concat[Int](2))
source ~> flow1 ~> concat
source ~> flow2 ~> concat
SourceShape(concat.out)
}
)
sg.runWith(sink)