Home > OS >  akka streaming file lines to actor router and writing with single actor. how to handle the backpress
akka streaming file lines to actor router and writing with single actor. how to handle the backpress

Time:10-12

I want to stream a file from s3 to actor to be parsed and enriched and to write the output to other file. The number of parserActors should be limited e.g application.conf

akka{
    actor{
        deployment {
              HereClient/router1 {
                router = round-robin-pool
                nr-of-instances = 28
              }
        }
    }
}

code

val writerActor = actorSystem.actorOf(WriterActor.props())
val parser = actorSystem.actorOf(FromConfig.props(ParsingActor.props(writerActor)), "router1")

however the actor that is writing to a file should be limited to 1 (singleton)

I tried doing something like

val reader: ParquetReader[GenericRecord] =  AvroParquetReader.builder[GenericRecord](file).withConf(conf).build()

  val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
source.map (record =>  record ! parser)

but I am not sure that the backpressure is handled correctly. any advice ?

CodePudding user response:

I think you should be using one of the "async" operations

Perhaps this other q/a gives you some insperation Processing an akka stream asynchronously and writing to a file sink

CodePudding user response:

Indeed your solution is disregarding backpressure.

The correct way to have a stream interact with an actor while maintaining backpressure is to use the ask pattern support of akka-stream (reference).

From my understanding of your example you have 2 separate actor interaction points:

  1. send records to the parsing actors (via a router)
  2. send parsed records to the singleton write actor

What I would do is something similar to the following:

val writerActor = actorSystem.actorOf(WriterActor.props())
val parserActor = actorSystem.actorOf(FromConfig.props(ParsingActor.props(writerActor)), "router1")

val reader: ParquetReader[GenericRecord] =  AvroParquetReader.builder[GenericRecord](file).withConf(conf).build()

val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
source.ask[ParsedRecord](28)(parserActor)
      .ask[WriteAck](writerActor)
      .runWith(Sink.ignore)

The idea is that you send all the GenericRecord elements to the parserActor which will reply with a ParsedRecord. Here as an example we specify a parallelism of 28 since that's the number of instances you have configured, however as long as you use a value higher than the actual number of actor instances no actor should suffer from work starvation.

Once the parseActor replies with the parsing result (here represented by the ParsedRecord) we apply the same pattern to interact with the singleton writer actor. Note that here we don't specify the parallelism as we have a single instance so it doesn't make sense the send more than 1 message at a time (in reality this happens anyway due to buffering at async boundaries, but this is just a built-in optimization). In this case we expect that the writer actor replies with a WriteAck to inform us that the writing has been successful and we can send the next element.

Using this method you are maintaining backpressure throughout your whole stream.

  • Related