Home > OS >  Akka Streams: How to form inlets and outlets for a Graph using a Flow
Akka Streams: How to form inlets and outlets for a Graph using a Flow

Time:11-29

I have some code that is similar to the following:

object Test extends App {
  val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val input = builder.add(Balance[Int](1)) //Question 1) how to get rid of this input
      val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
      val balance = builder.add(Balance[Int](2))

      val flow1 = Flow[Int].map(_*2)
      val flow2 = Flow[Int].map(_*2)

      val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
        left   right
      }))

      val flow3 = Flow[Int].map(_*2)

      input ~> buffer ~> balance.in
      balance.out(0) ~> flow1 ~> zip.in0
      balance.out(1) ~> flow2 ~> zip.in1
      zip.out ~> flow3

      FlowShape(input.in, flow3) //Question 2) how to make an outlet here
    })
}

Notice that I had to add a Balance called input, because I cannot retrieve an Inlet from the first Buffer of the FlowShape I want to create. Is there any other simpler way to solve this? Creating a Balance with 1 Outlet seems to be the wrong way to do this.

My second question is similar. I cannot retrieve an Outlet from flow3. The only way I know to solve this problem is to create yet another Balance, and expose its Outlet as the Outlet of the entire FlowShape. Any better way to solve this problem?

CodePudding user response:

A Balance is a fan-out shape that emits to the first available output. Considering you are zipping the flows in the next step, what you need is a Broadcast. It will fan-out to all outputs when all of them are available.

Also, the builder can add any of the shapes that are a Graph, this includes Flow. You don't have to use a custom shape for that.

The updated code:

object Test extends App {
  val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
      val input = builder.add(buffer) 
      val broadcast = builder.add(Broadcast[Int](2))

      val flow1 = Flow[Int].map(_*2)
      val flow2 = Flow[Int].map(_*2)

      val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
        left   right
      }))

      val flow3 = builder.add(Flow[Int].map(_*2))

      input ~> broadcast.in
      broadcast.out(0) ~> flow1 ~> zip.in0
      broadcast.out(1) ~> flow2 ~> zip.in1
      zip.out ~> flow3.in

      FlowShape(input.in, flow3.out) 
    })
}
  • Related