I am trying to filter out common elements in terms of latest versions of the object and return another Source. My object looks like:
case class Record(id: String, version: Long)
My method's input are two Source:
val sourceA: Source[Record, _] = <>
val sourceB: Source[Record, _] = <>
sourceA
and sourceB
has common id
of the Record
object but there is a possibility that version
s are different in both. I want to create a method which returns a Source[Record, _]
which will have latest version for an id. I tried
val latestCombinedSource: Source[Record, _] = sourceA map {each => {
sourceB.map(eachB => eachB.version > each.version? eachB: each)
.....
}
}
CodePudding user response:
You did not mention what type of Source
/ what streaming library you are asking about (please update your question to clarify that). From the signatures in the code, I assume that this is about akka-stream. If that is correct, then you probably want to use zipLatestWith:
val latestCombinedSource: Source[Record, _] =
sourceA.zipLatestWith(sourceB) { (a, b) =>
if (a.version > b.version) a else b
}
Note that there is also zipWith and I'm not 100% sure which one you'd want to use. The difference (quoted from the API docs) is: zipWithLatest
"Emits when all of the inputs have at least an element available, and then each time an element becomes available on either of the inputs" while zipWith
"Emits when all of the inputs have an element available".