I have some code that is similar to the following:
object Test extends App {
val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val input = builder.add(Balance[Int](1)) //Question 1) how to get rid of this input
val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
val balance = builder.add(Balance[Int](2))
val flow1 = Flow[Int].map(_*2)
val flow2 = Flow[Int].map(_*2)
val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
left right
}))
val flow3 = Flow[Int].map(_*2)
input ~> buffer ~> balance.in
balance.out(0) ~> flow1 ~> zip.in0
balance.out(1) ~> flow2 ~> zip.in1
zip.out ~> flow3
FlowShape(input.in, flow3) //Question 2) how to make an outlet here
})
}
Notice that I had to add a Balance
called input
, because I cannot retrieve an Inlet
from the first Buffer
of the FlowShape
I want to create. Is there any other simpler way to solve this? Creating a Balance
with 1 Outlet
seems to be the wrong way to do this.
My second question is similar. I cannot retrieve an Outlet
from flow3
. The only way I know to solve this problem is to create yet another Balance
, and expose its Outlet
as the Outlet
of the entire FlowShape
. Any better way to solve this problem?
CodePudding user response:
A Balance
is a fan-out shape that emits to the first available output. Considering you are zipping the flows in the next step, what you need is a Broadcast
. It will fan-out to all outputs when all of them are available.
Also, the builder can add any of the shapes that are a Graph
, this includes Flow
. You don't have to use a custom shape for that.
The updated code:
object Test extends App {
val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
val input = builder.add(buffer)
val broadcast = builder.add(Broadcast[Int](2))
val flow1 = Flow[Int].map(_*2)
val flow2 = Flow[Int].map(_*2)
val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
left right
}))
val flow3 = builder.add(Flow[Int].map(_*2))
input ~> broadcast.in
broadcast.out(0) ~> flow1 ~> zip.in0
broadcast.out(1) ~> flow2 ~> zip.in1
zip.out ~> flow3.in
FlowShape(input.in, flow3.out)
})
}