I am trying to download a large file from S3 and sending it's data to another actor that is doing an http request and then to persist the response. I want to limit number of requests sent by that actor hence I need to handle backpressure.
I tried doing something like this :
S3.download(bckt, bcktKey).map{
case Some((file, _)) =>
file
.via(CsvParsing.lineScanner())
.map(_.map(_.utf8String)).drop(1)//drop headers
.map(p => Foo(p.head, p(1)))
.mapAsync(30) { p =>
implicit val askTimeout: Timeout = Timeout(10 seconds)
(httpClientActor ? p).mapTo[Buzz]
}
.mapAsync(1){
case b@Buzz(_, _) =>
(persistActor ? b).mapTo[Done]
}.runWith(Sink.head)
The problem is that I see that it reads only 30 lines from file as the limit set for parallelism. I am not sure that this is the correct way to achieve what I'm looking for
CodePudding user response:
If the reason is not the usage of Sink.head
as I mentioned in the comment, you can backpressure the stream using Sink.actorRefWithBackpressure
.
Sample code:
class PersistActor extends Actor {
override def receive: Receive = {
case "init" =>
println("Initialized")
case "complete" =>
context.stop(self)
case message =>
//Persist Buzz??
sender() ! Done
}
}
val sink = Sink
.actorRefWithBackpressure(persistActor, "init", Done, "complete", PartialFunction.empty)
S3.download(bckt, bcktKey).map{
case Some((file, _)) =>
file
.via(CsvParsing.lineScanner())
.map(_.map(_.utf8String)).drop(1)//drop headers
.map(p => Foo(p.head, p(1)))
//You could backpressure here too...
.mapAsync(30) { p =>
implicit val askTimeout: Timeout = Timeout(10 seconds)
(httpClientActor ? p).mapTo[Buzz]
}
.to(sink)
.run()
CodePudding user response:
As Johny notes in his comment, the Sink.head
is what causes the stream to only process about 30 elements. What happens is approximately:
Sink.head
signals demand for 1 element- this demand propagates up through the second
mapAsync
- when the demand reaches the first
mapAsync
, since that one has parallelism 30, it signals demand for 30 elements - the CSV parsing stages emit 30 elements
- when the response to the ask with the first element from the client actor is received, the response propagates down to the ask of the persist actor
- demand is signaled for one more element from the CSV parsing stages
- when the persist actor responds, the response goes to the sink
- since the sink is
Sink.head
which cancels the stream once it receives an element, the stream gets torn down - any asks of the client actor which have been sent but are awaiting a response will still get processed
There's a bit of a race between the persist actor's response and the CSV parsing and sending an ask to the client actor: if the latter is faster, 31 lines might get processed by the client actor.
If you just want a Future[Done]
after every element has been processed, Sink.last
will work very well with this code.