I am trying to compare two tuples of objects with each other and return a Source
of latest tuples. For example, I have two objects ObjectA
and ObjectB
which are in form of tuple like (ObjectA, Some[ObjectB])
. I have got another object which is a probing source containing JoinedObject
which has (ObjectA, Some(ObjectB))
. Following are the case classes:
case class ObjectA(id: String, version: Long)
case class ObjectB(id: String, version: Long)
case class JoinedObject(objectA: ObjectA, objectB: ObjectB)
In my use case my method has inputs as Source[JoinedObject, _] and tuple of (ObjectA, ObjectB) which will have the same object ids for each tuple:
def myMethod(joinedSource: Source[JoinedObject, _], abProbePair: Future[Map[String, (ObjectA, Some(ObjectB))]]: Source[ObjectA, Option[ObjectB]] => {
//method returning a tuple after comparing both inputs according to version id
}
I want to compare ObjectA from the tuple with ObjectA of the joinedSource and ObjectB from the tuple with ObjectB of the joined source. So far I came up with this:
def myMethod(joinedSource: Source[JoinedObject, _], abProbePair: Future[Map[String, (ObjectA, Some(ObjectB))]]): Source[(ObjectA, Option[ObjectB]), _] => {
val returnSource = abProbePair map { eachPair => {
val commonIdToSearchInMap = eachPair.ObjectA.id //since ObjectA and ObjectB will have same id
joinedSource map { joinedRecord =>
val joinedRecordObjA = joinedRecord.get(commonIdToSearchInMap)._1
val joinedRecordObjB = joinedRecord.get(commonIdToSearchInMap)._2
//comparing ObjectA of eachPair(tuple) with the ObjectA of joined one
val latestObjectA = if(eachPair.ObjectA.recordVersion > joinedRecordObjA.recordVersion) eachPair.objectA else joinedRecord.ObjectA
//similar comparison for ObjectB
...
//return the pair
(latestObjectA, Some(latestObjectB))
}
}
}
Source.futureSource(returnSource)
First error is not correct retun type which is a source of the latest Pair. Also, I think the comparison using id in a map is probably not correct. Any help is appreciated here. Thanks
CodePudding user response:
You are asking at least two questions at once that you should try to answer separately. I strongly recommend that you learn to decompose such tasks into smaller, separate building blocks. Not only because you will get better answers on stackoverflow, but also because it will make you a better developer and because those smaller building blocks are easier to test (especially if you separate synchronous from asynchronous logic).
Anyway, I will try to answer your questions:
Question 1: how to combine a source and a value from a future
What I think you are trying to ask is:
"I have three types A
, B
and C
, as Source[A, _]
, a Future[B]
and a combinator function of type (A, B) => C
. How can I combine the values from the source and the value of the future to a Source[C, _]
that emits the results from combining each A
from the source with the B
from the future using the combinator-function?"
Answer (one of various options): use Source.future
to lift the future into a source and then use zipLatestWith
to combine them:
def combineSourceAndFuture[A, B, C, Mat](
source: Source[A, Mat],
future: Future[B]
)(
combinator: (A, B) => C
): Source[C, Mat] = {
source.zipLatestWith(Source.future(future))(combinator)
}
Question 2: your specific synchronous logic
What I think you are also trying to ask is:
"I have these three specific case classes ObjectA
, ObjectB
and JoinedObject
and want to implement a function of type (JoinedObject, Map[String, (ObjectA, Option[ObjectB]]) => (ObjectA, Option[ObjectB])
that compares the ObjectA
s and ObjectB
s from the JoinedObject
with the value from the map of the same id
and selects the one with the greater version
. How can I do that?"
Your original question left out various details and I had to make some assumptions (e.g. because (ObjectA, Some(ObjectB))
is not a valid type), so I'm not sure if the solution below is 100% correct, but here you go:
def newestAandB(
joinedObject: JoinedObject,
map: Map[String, (ObjectA, Option[ObjectB])]
): (ObjectA, Option[ObjectB]) = {
val a: ObjectA = {
val thisA = joinedObject.objectA
val aPairOpt: Option[(ObjectA, Option[ObjectB])] = map.get(thisA.id)
aPairOpt.fold(thisA) { case (thatA, _) =>
if(thisA.version > thatA.version) thisA else thatA
}
}
// this logic always returns a b - you did not specify what should happen
// if there is no value for the id within the map - this returns
// joinedObject.objectB in that case
val b: ObjectB = {
val thisB = joinedObject.objectB
val bPairOpt: Option[(ObjectA, Option[ObjectB])] = map.get(thisB.id)
bPairOpt.fold(thisB) { case (_, thatBOpt) =>
thatBOpt.fold(thisB) { thatB =>
if (thisB.version > thatB.version) thisB else thatB
}
}
}
(a, Some(b))
}
Obviously, the above is quite redundant and leaves room for a follow-up question "how could I make this less redundant?", but I will leave that open for now.
Anyway: the good thing about this is that you can easily test it synchronously without setting up any akka stream etc.
Finally: combined answer
If I got your overall question right, then the soulution could be implemented based on the above building blocks like this:
def myMethod(
joinedSource: Source[JoinedObject, _],
abProbePair: Future[Map[String, (ObjectA, Option[ObjectB])]]
): Source[(ObjectA, Option[ObjectB]), _] = {
combineSourceAndFuture(joinedSource, abProbePair)(newerAandB)
}