I'm using Akka Streams to stream lines of text from a file. All I'm trying to do currently is print each line to stdout. I'm able to get this working without any throttling, i.e. all the lines print to stdout immediately. But when I try to throttle the stream, for example printing 1 line per second, the stream appears to cancel after printing the first line to std out.
object FStream {
// Make config implicit.
implicit val conf = ConfigUtils.loadAppConfig[ArrivalsAppConfig]("arrivals")
import AkkaStreamUtils.defaultActorSystem._
// Source of raw data
def rawDataStream(path: String): Source[ByteString, Future[IOResult]] = {
val file = Paths.get(path)
val ioRes: Source[ByteString, Future[IOResult]] = FileIO.fromPath(file)
ioRes
}
def main(args: Array[String]): Unit = {
val eSource: Source[ByteString, Future[IOResult]] =
rawDataStream(conf.eventsFilePath)
val eFlow =
Flow[ByteString]
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000))
.map(bs => bs.utf8String)
.throttle(1, 1.second, 1, ThrottleMode.shaping)
val eSink = Sink.foreach(println)
eSource.via(eFlow).runWith(eSink)
}
}
Input File
James
Is
My
First
Name
Expected Result
$ sbt run
James # 0s
Is # 1s
My # 2s
First # 3s
Name # 2s
Actual Result
$ sbt run
James # 0s
CodePudding user response:
Your main method is not waiting for the stream completion.
runWith
with a Sink.foreach
gives you a Future
that you need to await on:
val f = eSource.via(eFlow).runWith(eSink)
Await.result(f, 10.seconds)
See documentation at https://doc.akka.io/docs/akka/current/stream/operators/Sink/foreach.html
The sink materializes into a
Future[Done]
which completes when the stream completes, or fails if the stream fails.