Home > Net >  How to generate the materialized value from the elements in Source or Flow?
How to generate the materialized value from the elements in Source or Flow?

Time:10-01

Suppose there is a source of type Source[Int, NotUsed]. How can this be turned into a Source[Int, T] where the materialized value T is computed based on the elements of the source?

Example: I would like summing the elements from a stream; how to implement dumbFlow so result should be 6 instead of 42?

val dumbFlow = ??? //Flow[Int].mapMaterializedValue(_ => 42)

//code below cannot be changed
val source = Source(List(1, 2, 3)).viaMat(dumbFlow)(Keep.right)
val result = source.toMat(Sink.ignore)(Keep.left).run() 
//result: Int = 42

I know how to achieve the same result using Sink.fold or Sink.head but I need the materialization logic in the Source; cannot change .to(Sink.ignore).

CodePudding user response:

Strictly speaking, the materialized value is always computed (including any mapMaterializedValue/toMat/viaMat etc.) before a single element goes through the stream and thus cannot depend on the elements of the stream.

If the materialized value happens to be a Future (in the Scala API), the future can be constructed (though not yet completed) and the stream can complete the Future based on the elements. In general, the Future materialized values are from sinks (e.g., as you note Sink.fold/Sink.head).

The alsoTo operator on a Source or Flow lets you embed a Sink to the side of a Source/Flow. It has an alsoToMat companion which lets you combine the Sink's materialized value with the Source/Flow's.

So one could have

val summingSink = Sink.fold[Int, Int](0)(_   _)
val dumbFlow: Flow[Int, Int, Future[Int]] = Flow[Int].alsoToMat(summingSink)(Keep.right)
val result: Future[Int] = source.toMat(Sink.ignore)(Keep.left).run()
result.foreach(println _)
// will eventually print 6
  • Related