Home > database >  Akka Streams - fan out with filter
Akka Streams - fan out with filter

Time:10-11

I'm attempting to apply a separate a filter on 2 output streams as a result of a fanout. I emit 3 objects of Type Test defined as :

case class Test(test: String, tester: Double)

Running the below src does not produce any results :

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ClosedShape
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, RunnableGraph, Sink, Source, Zip}

object TestFanOut extends App {
  implicit val actorSystem = ActorSystem()

  val filter1 = Flow[Test].filter(f => f.test.equalsIgnoreCase("2"))
  val filter2 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))
  val output = Sink.foreach[(Test , Test)](println)
  val input = Source.repeat(Test("1" , 23)).take(3)

  case class Test(test: String, tester: Double)

  val graph = RunnableGraph.fromGraph(
    GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[Test](2))
      val zip = builder.add(Zip[Test , Test])

      input ~> broadcast

      broadcast.out(0) ~> filter1 ~> zip.in0
      broadcast.out(1) ~> filter2 ~> zip.in1

      zip.out ~> output

      ClosedShape

    }
  )

  graph.run()
}

However, changing the filters to :

  val filter1 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))
  val filter2 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))

produces :

(Test(1,23.0),Test(1,23.0))
(Test(1,23.0),Test(1,23.0))
(Test(1,23.0),Test(1,23.0))

It seems my filtering logic is not correctly applied ?

When I define the filters as :

  val filter1 = Flow[Test].filter(f => f.test.equalsIgnoreCase("2"))
  val filter2 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))

I expect output of :

(Test(1,23.0),Test(1,23.0))
(Test(1,23.0))

As this this matches one of the fan out streams.

CodePudding user response:

Zip requires output from both Flows. In this case, it won't be possible because of the filter.

Instead, you could use Merge,

val output = Sink.foreach[Test](println)

val graph = RunnableGraph.fromGraph(
  GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    val broadcast = builder.add(Broadcast[Test](2))
    val merge = builder.add(Merge[Test](2))

    input ~> broadcast

    broadcast.out(0) ~> filter1 ~> merge
    broadcast.out(1) ~> filter2 ~> merge

    merge ~> output

    ClosedShape

  }
) 

This would print:

Test(1,23.0)
Test(1,23.0)
Test(1,23.0)

On the other hand, if you want zipped content, you could refactor using Option,

val filter1 = Flow[Option[Test]].map(f => f.filter(_.test.equalsIgnoreCase("2")))
val filter2 = Flow[Option[Test]].map(f => f.filter(_.test.equalsIgnoreCase("1")))
val output = Sink.foreach[(Option[Test] , Option[Test])](println)
val input = Source.repeat(Option(Test("1" , 23))).take(3)

val graph = RunnableGraph.fromGraph(
  GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    val broadcast = builder.add(Broadcast[Option[Test]](2))
    val zip = builder.add(Zip[Option[Test] , Option[Test]])

    input ~> broadcast

    broadcast.out(0) ~> filter1 ~> zip.in0
    broadcast.out(1) ~> filter2 ~> zip.in1

    zip.out ~> output

    ClosedShape

  }
)

This would print:

(None,Some(Test(1,23.0)))
(None,Some(Test(1,23.0)))
(None,Some(Test(1,23.0)))
  • Related