Home > Blockchain >  Applying functions in stream
Applying functions in stream

Time:10-24

I'm attempting to apply various functions in stream such as mean, volatility. From a list I create sub lists using grouped :

Below code calculates the sum in stream :

import akka.actor.ActorSystem
import akka.stream.scaladsl.Source

object CalculateSumInStream extends App {

  implicit val actorSystem = ActorSystem()

  case class Person(name: String, age: Double)
  val personSource = Source(List(Person("1", 30),Person("1", 20),Person("1", 20),Person("1", 30),Person("2", 2)))

  personSource
    .groupBy(maxSubstreams = 2, s => s.name)
    .grouped(10)
    .mapConcat(identity)
    .reduce((person1, person2) => {
      Person(person1.name, person1.age   person2.age)
    })
    .mergeSubstreams
    .runForeach(println)

}

produces :

Person(2,2.0)
Person(1,100.0)

Can 'non stream' functions be used calculate sub stream calculations, for example mean.

For example, to implement mean in native Scala on same List of Person can use :

object CalculateStats extends App{

case class Person(name: String, age: Double)

val personSource = List(Person("1", 30),Person("1", 20),Person("1", 20),Person("1", 30),Person("2", 2))

def calculateMean(personList: List[Person]): (String , Double) = {
val values = personList.map(m => m.age)
(personList(0).name , values.sum / values.size)
}

personSource
.groupBy(g => g.name)
.map(m => calculateMean(m._2))
.foreach(println)

}

But is this kind of 'pattern' possible in Scala Akka Streams ? By pattern I mean applying a function in stream such as calculateMean defined above. So, instead of .reduce((person1, person2) => { invoking the function calculateMean defined above and producing the output :

(1,25.0)
(2,2.0)

Where 1 and 2 are the Person name and 25.0 and 2.0 are the mean age for each person.

CodePudding user response:

If you do not care about the memory this takes, you could just fold the values into a list and then map that list through your function:

personSource
  .groupBy(maxSubstreams = 2, s => s.name)
  .fold(List.empty[Person])((list, person) => person :: list)
  .map(calculateMean)
  .mergeSubstreams
  .runForeach(println)

Alternatively, you could introduce some intermediate helper data type:

case class PersonAvg(name: String, total: Double, count: Int) {
  def avgAge: Double = total / count
  
  def  (that: PersonAvg) = {
    copy(total = total   that.total, count = count   that.count)
  }
}
  
personSource
  .groupBy(maxSubstreams = 2, s => s.name)
  .map(p => PersonAvg(p.name, p.age, 1))
  .reduce(_   _)
  .map(p => p.name -> p.avgAge)
  .mergeSubstreams
  .runForeach(println)

P.S.: you could of course also use an aonymous Tuple3 instead of introducing a dedicated case class, but imho, a dedicated case class with clear names make this much more readable and is worth the little bit of extra code.

  • Related