Suppose there is a source of type Source[Int, NotUsed]
. How can this be turned into a Source[Int, T]
where the materialized value T
is computed based on the elements of the source?
Example: I would like summing the elements from a stream; how to implement dumbFlow
so result should be 6 instead of 42?
val dumbFlow = ??? //Flow[Int].mapMaterializedValue(_ => 42)
//code below cannot be changed
val source = Source(List(1, 2, 3)).viaMat(dumbFlow)(Keep.right)
val result = source.toMat(Sink.ignore)(Keep.left).run()
//result: Int = 42
I know how to achieve the same result using Sink.fold
or Sink.head
but I need the materialization logic in the Source
; cannot change .to(Sink.ignore)
.
CodePudding user response:
Strictly speaking, the materialized value is always computed (including any mapMaterializedValue
/toMat
/viaMat
etc.) before a single element goes through the stream and thus cannot depend on the elements of the stream.
If the materialized value happens to be a Future
(in the Scala API), the future can be constructed (though not yet completed) and the stream can complete the Future
based on the elements. In general, the Future
materialized values are from sinks (e.g., as you note Sink.fold
/Sink.head
).
The alsoTo
operator on a Source
or Flow
lets you embed a Sink
to the side of a Source
/Flow
. It has an alsoToMat
companion which lets you combine the Sink
's materialized value with the Source
/Flow
's.
So one could have
val summingSink = Sink.fold[Int, Int](0)(_ _)
val dumbFlow: Flow[Int, Int, Future[Int]] = Flow[Int].alsoToMat(summingSink)(Keep.right)
val result: Future[Int] = source.toMat(Sink.ignore)(Keep.left).run()
result.foreach(println _)
// will eventually print 6