Home > Net >  Akka stream fails when throttled
Akka stream fails when throttled

Time:12-09

I'm using Akka Streams to stream lines of text from a file. All I'm trying to do currently is print each line to stdout. I'm able to get this working without any throttling, i.e. all the lines print to stdout immediately. But when I try to throttle the stream, for example printing 1 line per second, the stream appears to cancel after printing the first line to std out.

object FStream {

  // Make config implicit.
  implicit val conf = ConfigUtils.loadAppConfig[ArrivalsAppConfig]("arrivals")

  import AkkaStreamUtils.defaultActorSystem._

  // Source of raw data
  def rawDataStream(path: String): Source[ByteString, Future[IOResult]] = {

    val file = Paths.get(path)

    val ioRes: Source[ByteString, Future[IOResult]] = FileIO.fromPath(file)

    ioRes
  }

  def main(args: Array[String]): Unit = {

    val eSource: Source[ByteString, Future[IOResult]] = 
      rawDataStream(conf.eventsFilePath)

    val eFlow = 
      Flow[ByteString]
        .via(Framing.delimiter(ByteString(System.lineSeparator), 10000))
        .map(bs => bs.utf8String)
        .throttle(1, 1.second, 1, ThrottleMode.shaping)

    val eSink = Sink.foreach(println)

    eSource.via(eFlow).runWith(eSink)
  }
}

Input File

James
Is
My
First
Name

Expected Result

$ sbt run
James # 0s
Is    # 1s
My    # 2s
First # 3s
Name  # 2s

Actual Result

$ sbt run
James # 0s

CodePudding user response:

Your main method is not waiting for the stream completion.

runWith with a Sink.foreach gives you a Future that you need to await on:

val f = eSource.via(eFlow).runWith(eSink)

Await.result(f, 10.seconds)

See documentation at https://doc.akka.io/docs/akka/current/stream/operators/Sink/foreach.html

The sink materializes into a Future[Done] which completes when the stream completes, or fails if the stream fails.

  • Related