Suppose I have RDD(String,List[Int])
, i.e. ("David",List(60,70,80)),("John",List(70,80,90))
. How can I use reduceByKey in scala to calculate average of List[Int]
. In the end, I want to have another RDD which is like ("David",70),("John",80)
CodePudding user response:
So you said you want an RDD but based on your starting point there could be duplicate keys:
List(("David", List(60, 70, 80)), ("David", List(70, 80, 90)))
Here is a solution where we group by key beforehand to compute the global average:
val rddAvg: RDD[(String, Int)] =
rdd1
.groupByKey()
.mapValues((numbers: Iterable[List[Int]]) => numbers.flatten.sum / numbers.size)
Check the result:
println {
rddAvg
.collect()
.mkString(";")
}
Regarding your question: something based on reduceByKey
doesn't look good for your case because of it's type signature:
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
V
in your case is List[Int]
so you're getting a RDD[(String, List[Int])]
.
A workaround is work with a List
of one element, the actual average:
val rddAvg: RDD[(String, Int)] =
rdd1
.reduceByKey { case (key, numbers) => List(numbers.sum / numbers.length) }
.mapValues(_.head)
You could as well make something based on aggregateByKey
.
This function can return a different result type.
CodePudding user response:
val data1 = List(("David", List(60, 70, 80)), ("John", List(70, 80, 90)))
val rdd1 = sc.parallelize(data1)
print(rdd1.mapValues(value => value.sum.toDouble / value.size).collect)