Using below code I'm attempting to output the name and the sum of the ages for each person :
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Sink, Source}
object CalculateMeanInStream extends App {
implicit val actorSystem = ActorSystem()
case class Person(name: String, age: Double)
val personSource = Source(List(Person("1", 30),Person("1", 20),Person("1", 20),Person("1", 30),Person("2", 2)))
val meanPrintSink = Sink.foreach[Double](println)
val printSink = Sink.foreach[Double](println)
def calculateMean(values: List[Double]): Double = {
values.sum / values.size
}
personSource.groupBy(maxSubstreams = 2 , s => s.name)
.map(m => m.age)
.reduce(_ _ )
.mergeSubstreams
.runForeach(println)
}
The output is :
2.0
100.0
Is there a way to keep the persons name as part of the reduce so that the following is produced in the output :
(2.0 , 2)
(100.0 , 1)
I've tried :
personSource.groupBy(maxSubstreams = 2 , s => s.name)
.reduce((x , y) => x.age y.age)
.mergeSubstreams
.runForeach(println)
but throws compiler error :
type mismatch;
found : Double
required: CalculateMeanInStream.Person
.reduce((x , y) => x.age y.age)
CodePudding user response:
There might be a more elegant way but I'd do it like this:
personSource
.groupBy(maxSubstreams = 2, s => s.name)
.map(x => x.name -> x.age)
.reduce { case ((a, b) , (_, d)) => (a, b d) }
.mergeSubstreams
.runForeach(println)
CodePudding user response:
personSource
.groupBy(maxSubstreams = 2, s => s.name)
.reduce((person1, person2) => Person(person1.name, person1.age person2.age))
.mergeSubstreams
.runForeach(println)