Home > Blockchain >  Akka - convert Flow into Collection or Publisher
Akka - convert Flow into Collection or Publisher

Time:12-01

I'm trying to split an Akka Source into two separate ones.

  val requestFlow = Flow[BodyPartEntity].to(Sink.seq) // convert to Seq[BodyPartEntity]
  val dataFlow    = Flow[BodyPartEntity].to(Sink.asPublisher(fanout = false)) // convert to Publisher[BodyPartEntity]

  implicit class EitherSourceExtension[L, R, Mat](source: Source[FormData.BodyPart, Mat]) {
    def partition(left: Sink[BodyPartEntity, NotUsed], right: Sink[BodyPartEntity, NotUsed]): Graph[ClosedShape, NotUsed] = {
      GraphDSL.create() { implicit builder =>
        import akka.stream.scaladsl.GraphDSL.Implicits._
        val partition = builder.add(Partition[FormData.BodyPart](2, element => if (element.getName == "request") 0 else 1))
        source ~> partition.in
        partition.out(0).map(_.getEntity) ~> left
        partition.out(1).map(_.getEntity) ~> right
        ClosedShape
      }
    }
  }

How to convert requestFlow into Seq[BodyPartEntity] and dataFlow into Publisher[BodyPartEntity]

CodePudding user response:

You could use a BroadcastHub for this. From doc:

A BroadcastHub can be used to consume elements from a common producer by a dynamic set of consumers.

Simplified code:

val runnableGraph: RunnableGraph[Source[Int, NotUsed]] =
  Source(1 to 5).toMat(
    BroadcastHub.sink(bufferSize = 4))(Keep.right)

val fromProducer: Source[Int, NotUsed] = runnableGraph.run()

// Process the messages from the producer in two independent consumers
fromProducer.runForeach(msg => println("consumer1: "   msg))
fromProducer.runForeach(msg => println("consumer2: "   msg))
  • Related