Home > Back-end >  How to handle backpressure when Streaming file from s3 with actor interop
How to handle backpressure when Streaming file from s3 with actor interop

Time:11-26

I am trying to download a large file from S3 and sending it's data to another actor that is doing an http request and then to persist the response. I want to limit number of requests sent by that actor hence I need to handle backpressure.
I tried doing something like this :

 S3.download(bckt, bcktKey).map{
      case Some((file, _)) =>
        file
          .via(CsvParsing.lineScanner())
          .map(_.map(_.utf8String)).drop(1)//drop headers
          .map(p => Foo(p.head, p(1)))
          .mapAsync(30) { p =>
            implicit val askTimeout: Timeout = Timeout(10 seconds)
            (httpClientActor ? p).mapTo[Buzz]
          }
          .mapAsync(1){
          case b@Buzz(_, _) =>
            (persistActor ? b).mapTo[Done]
        }.runWith(Sink.head)

The problem is that I see that it reads only 30 lines from file as the limit set for parallelism. I am not sure that this is the correct way to achieve what I'm looking for

CodePudding user response:

If the reason is not the usage of Sink.head as I mentioned in the comment, you can backpressure the stream using Sink.actorRefWithBackpressure.

Sample code:

class PersistActor extends Actor {

    override def receive: Receive = {
      case "init" =>
        println("Initialized")
      case "complete" =>
        context.stop(self)
      case message =>
        //Persist Buzz??
        sender() ! Done
    }
 }

 val sink = Sink
   .actorRefWithBackpressure(persistActor, "init", Done, "complete", PartialFunction.empty)

S3.download(bckt, bcktKey).map{
  case Some((file, _)) =>
    file
      .via(CsvParsing.lineScanner())
      .map(_.map(_.utf8String)).drop(1)//drop headers
      .map(p => Foo(p.head, p(1)))
      //You could backpressure here too...
      .mapAsync(30) { p =>
        implicit val askTimeout: Timeout = Timeout(10 seconds)
        (httpClientActor ? p).mapTo[Buzz]
      }
      .to(sink)
      .run()

CodePudding user response:

As Johny notes in his comment, the Sink.head is what causes the stream to only process about 30 elements. What happens is approximately:

  • Sink.head signals demand for 1 element
  • this demand propagates up through the second mapAsync
  • when the demand reaches the first mapAsync, since that one has parallelism 30, it signals demand for 30 elements
  • the CSV parsing stages emit 30 elements
  • when the response to the ask with the first element from the client actor is received, the response propagates down to the ask of the persist actor
  • demand is signaled for one more element from the CSV parsing stages
  • when the persist actor responds, the response goes to the sink
  • since the sink is Sink.head which cancels the stream once it receives an element, the stream gets torn down
  • any asks of the client actor which have been sent but are awaiting a response will still get processed

There's a bit of a race between the persist actor's response and the CSV parsing and sending an ask to the client actor: if the latter is faster, 31 lines might get processed by the client actor.

If you just want a Future[Done] after every element has been processed, Sink.last will work very well with this code.

  • Related