I'm attempting to apply various functions in stream such as mean, volatility. From a list I create sub lists using grouped :
Below code calculates the sum in stream :
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
object CalculateSumInStream extends App {
implicit val actorSystem = ActorSystem()
case class Person(name: String, age: Double)
val personSource = Source(List(Person("1", 30),Person("1", 20),Person("1", 20),Person("1", 30),Person("2", 2)))
personSource
.groupBy(maxSubstreams = 2, s => s.name)
.grouped(10)
.mapConcat(identity)
.reduce((person1, person2) => {
Person(person1.name, person1.age person2.age)
})
.mergeSubstreams
.runForeach(println)
}
produces :
Person(2,2.0)
Person(1,100.0)
Can 'non stream' functions be used calculate sub stream calculations, for example mean.
For example, to implement mean in native Scala on same List
of Person
can use :
object CalculateStats extends App{
case class Person(name: String, age: Double)
val personSource = List(Person("1", 30),Person("1", 20),Person("1", 20),Person("1", 30),Person("2", 2))
def calculateMean(personList: List[Person]): (String , Double) = {
val values = personList.map(m => m.age)
(personList(0).name , values.sum / values.size)
}
personSource
.groupBy(g => g.name)
.map(m => calculateMean(m._2))
.foreach(println)
}
But is this kind of 'pattern' possible in Scala Akka Streams ? By pattern I mean applying a function in stream such as
calculateMean
defined above.
So, instead of .reduce((person1, person2) => {
invoking the function calculateMean
defined above and producing the output :
(1,25.0)
(2,2.0)
Where 1 and 2 are the Person name and 25.0 and 2.0 are the mean age for each person.
CodePudding user response:
If you do not care about the memory this takes, you could just fold
the values into a list and then map that list through your function:
personSource
.groupBy(maxSubstreams = 2, s => s.name)
.fold(List.empty[Person])((list, person) => person :: list)
.map(calculateMean)
.mergeSubstreams
.runForeach(println)
Alternatively, you could introduce some intermediate helper data type:
case class PersonAvg(name: String, total: Double, count: Int) {
def avgAge: Double = total / count
def (that: PersonAvg) = {
copy(total = total that.total, count = count that.count)
}
}
personSource
.groupBy(maxSubstreams = 2, s => s.name)
.map(p => PersonAvg(p.name, p.age, 1))
.reduce(_ _)
.map(p => p.name -> p.avgAge)
.mergeSubstreams
.runForeach(println)
P.S.: you could of course also use an aonymous Tuple3
instead of introducing a dedicated case class, but imho, a dedicated case class with clear names make this much more readable and is worth the little bit of extra code.