Home > front end >  Adding values from a Map to an existing Source in Scala
Adding values from a Map to an existing Source in Scala

Time:01-14

I am using akka streams. Can we add values from a map to an existing source in scala? I want to merge all the values from a Future[Map[String, A]] and a source of A Source[A,_] which are not already in the source. My method signature is:

case class A(id: String, someValue: String)
def addValuesToExistingSource(mySource: Source[A,_], myMapFuture: Future[Map[String, A]]): Source[A, _] = {
..//This is to be implemented
}

Thanks.

CodePudding user response:

Is this what you are looking for?

def addValuesToExistingSource(mySource: Source[A, _], myMapFuture: Future[Map[String, A]]): Source[A, _] = {
  //Change logic for values here
  val sf = Source.futureSource(myMapFuture.map(map => Source(map.values.toList)))
  Source.combine(mySource, sf)(Concat(_))
}

CodePudding user response:

I assume that the keys of that map represent "ids" as in A.id. That would be much easier to understand if you'd introduce a typedef:

type Id = String
case class A(id: Id, someValue: String)

If I understood correctly, you want a source that first emits all As from mySource and then (after mySource completed) emits all those values from the Map (from the Future) for which the key does not match any of the ids from the items of mySource?

Approach one: keep removing entries from the map and then emit the rest when mySource completes:

def addValuesToExistingSource(
    mySource: Source[A, _],
    myMap: Map[Id, A]
): Source[A, _] = {
  // map to Right and append a single Source that will signal that mySource completed
  val sourceWithCompleteItem: Source[Either[Unit, A], _] =
    mySource.map(a => Right(a))    Source.single(Left(()))

  // use statefulMapConcat
  sourceWithCompleteItem.statefulMapConcat { () =>
    var remainingMap = myMap

    {
      case Right(a: A) =>
        // mySource emitted another item -> remove entry of same id from remainingMap
        remainingMap -= a.id
        // ...and emit the item to downstream
        Iterable(a)
      case Left(_) =>
        // mySource completed -> emit the remaining values from the map
        remainingMap.values
    }
  }
}

def addValuesToExistingSource(
    mySource: Source[A, _],
    myMapFuture: Future[Map[Id, A]]
): Source[A, _] = {
  Source
    .future(myMapFuture)
    .flatMapConcat(addValuesToExistingSource(mySource, _))
}

Obviously, this has the downside that nothing will be emitted before the future completes.

Approach two: accumulate a set of all ids that were emitted:

def addValuesToExistingSourceB(
    mySource: Source[A, _],
    myMapFuture: Future[Map[Id, A]]
): Source[A, _] = {

  // map to Right and append a single Source that will signal that mySource completed
  val sourceWithCompleteItem: Source[Either[Unit, A], _] =
    mySource.map(a => Right(a))    Source.single(Left(()))

  // accumulator type for scan:
  case class ScanAcc(
      emittedIds: Set[Id] = Set.empty,
      isCompleted: Boolean = false,
      itemOpt: Option[A] = None
  )

  sourceWithCompleteItem
    .scan(ScanAcc()) {
      case (acc, Right(a)) =>
        ScanAcc(acc.emittedIds   a.id, isCompleted = false, Some(a))
      case (acc, Left(_)) =>
        ScanAcc(acc.emittedIds, isCompleted = true, None)
    }
    .flatMapConcat { scanAcc =>
      if (scanAcc.isCompleted) {
        Source
          .future(myMapFuture)
          .mapConcat(map => (map -- scanAcc.emittedIds).values)
      } else {
        scanAcc.itemOpt.fold[Source[A, _]](Source.empty)(Source.single)
      }
    }
}

This would emit before the future completes, but the downside is that you have this set in memory (which could become a problem depending how many items your source emits).

Here's that code with runnable examples: https://scastie.scala-lang.org/T6fd1tseTVmjGhcQfvoL0A

  • Related