I am using akka streams. Can we add values from a map to an existing source in scala? I want to merge all the values from a Future[Map[String, A]]
and a source of A Source[A,_]
which are not already in the source. My method signature is:
case class A(id: String, someValue: String)
def addValuesToExistingSource(mySource: Source[A,_], myMapFuture: Future[Map[String, A]]): Source[A, _] = {
..//This is to be implemented
}
Thanks.
CodePudding user response:
Is this what you are looking for?
def addValuesToExistingSource(mySource: Source[A, _], myMapFuture: Future[Map[String, A]]): Source[A, _] = {
//Change logic for values here
val sf = Source.futureSource(myMapFuture.map(map => Source(map.values.toList)))
Source.combine(mySource, sf)(Concat(_))
}
CodePudding user response:
I assume that the keys of that map represent "ids" as in A.id
. That would be much easier to understand if you'd introduce a typedef:
type Id = String
case class A(id: Id, someValue: String)
If I understood correctly, you want a source that first emits all A
s from mySource
and then (after mySource
completed) emits all those values from the Map (from the Future) for which the key does not match any of the ids from the items of mySource
?
Approach one: keep removing entries from the map and then emit the rest when mySource
completes:
def addValuesToExistingSource(
mySource: Source[A, _],
myMap: Map[Id, A]
): Source[A, _] = {
// map to Right and append a single Source that will signal that mySource completed
val sourceWithCompleteItem: Source[Either[Unit, A], _] =
mySource.map(a => Right(a)) Source.single(Left(()))
// use statefulMapConcat
sourceWithCompleteItem.statefulMapConcat { () =>
var remainingMap = myMap
{
case Right(a: A) =>
// mySource emitted another item -> remove entry of same id from remainingMap
remainingMap -= a.id
// ...and emit the item to downstream
Iterable(a)
case Left(_) =>
// mySource completed -> emit the remaining values from the map
remainingMap.values
}
}
}
def addValuesToExistingSource(
mySource: Source[A, _],
myMapFuture: Future[Map[Id, A]]
): Source[A, _] = {
Source
.future(myMapFuture)
.flatMapConcat(addValuesToExistingSource(mySource, _))
}
Obviously, this has the downside that nothing will be emitted before the future completes.
Approach two: accumulate a set of all ids that were emitted:
def addValuesToExistingSourceB(
mySource: Source[A, _],
myMapFuture: Future[Map[Id, A]]
): Source[A, _] = {
// map to Right and append a single Source that will signal that mySource completed
val sourceWithCompleteItem: Source[Either[Unit, A], _] =
mySource.map(a => Right(a)) Source.single(Left(()))
// accumulator type for scan:
case class ScanAcc(
emittedIds: Set[Id] = Set.empty,
isCompleted: Boolean = false,
itemOpt: Option[A] = None
)
sourceWithCompleteItem
.scan(ScanAcc()) {
case (acc, Right(a)) =>
ScanAcc(acc.emittedIds a.id, isCompleted = false, Some(a))
case (acc, Left(_)) =>
ScanAcc(acc.emittedIds, isCompleted = true, None)
}
.flatMapConcat { scanAcc =>
if (scanAcc.isCompleted) {
Source
.future(myMapFuture)
.mapConcat(map => (map -- scanAcc.emittedIds).values)
} else {
scanAcc.itemOpt.fold[Source[A, _]](Source.empty)(Source.single)
}
}
}
This would emit before the future completes, but the downside is that you have this set in memory (which could become a problem depending how many items your source emits).
Here's that code with runnable examples: https://scastie.scala-lang.org/T6fd1tseTVmjGhcQfvoL0A