Home > Back-end >  How Scala case method usage for pyspark
How Scala case method usage for pyspark

Time:12-25

In scala, when I have a RDD list like : List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)), and I want to calculate the avg number for each character.

Like for a it shows 3 times with the value count 1 2 6 = 9, so the result I expect is (a, 3).

In Scala I can write the code like below:

val newRdd = rdd.aggregateByKey((0,0))((t,v) => {(t._1   v, t._2   1)}, (t1, t2) => {(t1._1   t2._1, t1._2   t2._2)})
val result = newRdd.mapValues{
    case(num, count) => {
        num/count
    }
}

So the result RDD will return the one I expected.
However, how could I explain the case(num/count) to pyspark?

I've tried:

avg_rdd_2 = avg_rdd_1.mapValues(lambda x, y : x / y)

But I will get the error below.

21/12/24 01:27:02 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 6) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main process() File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process serializer.dump_stream(out_iter, outfile) File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper return f(*args, **kwargs) File "/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/rdd.py", line 2278, in map_values_fn = lambda kv: (kv[0], f(kv[1])) TypeError: () missing 1 required positional argument: 'y'

CodePudding user response:

Let's say that we have an RDD:

from pyspark.sql import SparkSession


spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)])

In your Scala example you first aggregated by key, the same could be done using:

groupByKey

new_rdd = rdd.groupByKey().mapValues(lambda x: sum(x) / len(x))

print(new_rdd.collect())
# [('b', 4.0), ('a', 3.0)]                                                        

aggregateByKey

new_rdd = rdd.aggregateByKey(
    (0, 0),
    lambda x, y: (x[0]   y, x[1]   1),
    lambda x, y: (x[0]   y[0], x[1]   y[1]),
)
result = new_rdd.mapValues(lambda x: x[0] / x[1])
print(result.collect())
# [('b', 4.0), ('a', 3.0)]                                                        

reduceByKey

result = (
    rdd.mapValues(lambda x: (x, 1))
    .reduceByKey(lambda x, y: (x[0]   y[0], x[1]   y[1]))
    .mapValues(lambda x: x[0] / x[1])
)

print(result.collect())
# [('b', 4.0), ('a', 3.0)]                                                        
  • Related