I'm attempting to apply a separate a filter on 2 output streams as a result of a fanout. I emit 3 objects of Type Test defined as :
case class Test(test: String, tester: Double)
Running the below src does not produce any results :
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ClosedShape
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, RunnableGraph, Sink, Source, Zip}
object TestFanOut extends App {
implicit val actorSystem = ActorSystem()
val filter1 = Flow[Test].filter(f => f.test.equalsIgnoreCase("2"))
val filter2 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))
val output = Sink.foreach[(Test , Test)](println)
val input = Source.repeat(Test("1" , 23)).take(3)
case class Test(test: String, tester: Double)
val graph = RunnableGraph.fromGraph(
GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Test](2))
val zip = builder.add(Zip[Test , Test])
input ~> broadcast
broadcast.out(0) ~> filter1 ~> zip.in0
broadcast.out(1) ~> filter2 ~> zip.in1
zip.out ~> output
ClosedShape
}
)
graph.run()
}
However, changing the filters to :
val filter1 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))
val filter2 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))
produces :
(Test(1,23.0),Test(1,23.0))
(Test(1,23.0),Test(1,23.0))
(Test(1,23.0),Test(1,23.0))
It seems my filtering logic is not correctly applied ?
When I define the filters as :
val filter1 = Flow[Test].filter(f => f.test.equalsIgnoreCase("2"))
val filter2 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))
I expect output of :
(Test(1,23.0),Test(1,23.0))
(Test(1,23.0))
As this this matches one of the fan out streams.
CodePudding user response:
Zip
requires output from both Flow
s. In this case, it won't be possible because of the filter.
Instead, you could use Merge
,
val output = Sink.foreach[Test](println)
val graph = RunnableGraph.fromGraph(
GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Test](2))
val merge = builder.add(Merge[Test](2))
input ~> broadcast
broadcast.out(0) ~> filter1 ~> merge
broadcast.out(1) ~> filter2 ~> merge
merge ~> output
ClosedShape
}
)
This would print:
Test(1,23.0)
Test(1,23.0)
Test(1,23.0)
On the other hand, if you want zipped content, you could refactor using Option
,
val filter1 = Flow[Option[Test]].map(f => f.filter(_.test.equalsIgnoreCase("2")))
val filter2 = Flow[Option[Test]].map(f => f.filter(_.test.equalsIgnoreCase("1")))
val output = Sink.foreach[(Option[Test] , Option[Test])](println)
val input = Source.repeat(Option(Test("1" , 23))).take(3)
val graph = RunnableGraph.fromGraph(
GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Option[Test]](2))
val zip = builder.add(Zip[Option[Test] , Option[Test]])
input ~> broadcast
broadcast.out(0) ~> filter1 ~> zip.in0
broadcast.out(1) ~> filter2 ~> zip.in1
zip.out ~> output
ClosedShape
}
)
This would print:
(None,Some(Test(1,23.0)))
(None,Some(Test(1,23.0)))
(None,Some(Test(1,23.0)))