In scala, when I have a RDD list like :
List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6))
, and I want to calculate the avg
number for each character.
Like for a
it shows 3 times with the value count 1 2 6 = 9, so the result I expect is (a, 3)
.
In Scala I can write the code like below:
val newRdd = rdd.aggregateByKey((0,0))((t,v) => {(t._1 v, t._2 1)}, (t1, t2) => {(t1._1 t2._1, t1._2 t2._2)})
val result = newRdd.mapValues{
case(num, count) => {
num/count
}
}
So the result RDD will return the one I expected.
However, how could I explain the case(num/count) to pyspark?
I've tried:
avg_rdd_2 = avg_rdd_1.mapValues(lambda x, y : x / y)
But I will get the error below.
21/12/24 01:27:02 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 6) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main process() File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process serializer.dump_stream(out_iter, outfile) File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper return f(*args, **kwargs) File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/rdd.py", line 2278, in map_values_fn = lambda kv: (kv[0], f(kv[1])) TypeError: () missing 1 required positional argument: 'y'
CodePudding user response:
Let's say that we have an RDD:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)])
In your Scala example you first aggregated by key, the same could be done using:
groupByKey
new_rdd = rdd.groupByKey().mapValues(lambda x: sum(x) / len(x))
print(new_rdd.collect())
# [('b', 4.0), ('a', 3.0)]
aggregateByKey
new_rdd = rdd.aggregateByKey(
(0, 0),
lambda x, y: (x[0] y, x[1] 1),
lambda x, y: (x[0] y[0], x[1] y[1]),
)
result = new_rdd.mapValues(lambda x: x[0] / x[1])
print(result.collect())
# [('b', 4.0), ('a', 3.0)]
reduceByKey
result = (
rdd.mapValues(lambda x: (x, 1))
.reduceByKey(lambda x, y: (x[0] y[0], x[1] y[1]))
.mapValues(lambda x: x[0] / x[1])
)
print(result.collect())
# [('b', 4.0), ('a', 3.0)]