Home > Blockchain >  How to keep attribute after reduce?
How to keep attribute after reduce?

Time:10-15

Using below code I'm attempting to output the name and the sum of the ages for each person :

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Sink, Source}

object CalculateMeanInStream extends App {

  implicit val actorSystem = ActorSystem()

  case class Person(name: String, age: Double)
  val personSource = Source(List(Person("1", 30),Person("1", 20),Person("1", 20),Person("1", 30),Person("2", 2)))
  val meanPrintSink = Sink.foreach[Double](println)
  val printSink = Sink.foreach[Double](println)

  def calculateMean(values: List[Double]): Double = {
    values.sum / values.size
  }

  personSource.groupBy(maxSubstreams = 2 , s  => s.name)
    .map(m => m.age)
    .reduce(_   _ )
    .mergeSubstreams
    .runForeach(println)

}

The output is :

2.0
100.0

Is there a way to keep the persons name as part of the reduce so that the following is produced in the output :

(2.0 , 2)
(100.0 , 1)

I've tried :

  personSource.groupBy(maxSubstreams = 2 , s  => s.name)
    .reduce((x , y) => x.age   y.age)
    .mergeSubstreams
    .runForeach(println)

but throws compiler error :

type mismatch;
 found   : Double
 required: CalculateMeanInStream.Person
    .reduce((x , y) => x.age   y.age)

CodePudding user response:

There might be a more elegant way but I'd do it like this:

personSource
    .groupBy(maxSubstreams = 2, s => s.name)
    .map(x => x.name -> x.age)
    .reduce { case ((a, b) , (_, d)) => (a, b   d) }
    .mergeSubstreams
    .runForeach(println)

CodePudding user response:

personSource
    .groupBy(maxSubstreams = 2, s => s.name)
    .reduce((person1, person2) => Person(person1.name, person1.age   person2.age))
    .mergeSubstreams
    .runForeach(println)
  • Related