Home > front end >  Pyspark: Grouping on a key in JSON and find mean of another key
Pyspark: Grouping on a key in JSON and find mean of another key

Time:07-17

I have a JSON file structured like so:

{"time":3,"points":6}
{"time":3,"points":2}
{"time":5,"points":1}

Using pyspark I am attempting to group by time and then find the mean of the points to return:

[(3, 4),
(5, 1)]

The setup is:

import json
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

dataset_json = sc.textFile("datafile.json")
dataset = dataset_json.map(lambda x: json.loads(x))

And so far I can group by time, but never can get the mean of the points.

def points_by_hour(dataset):
    return dataset.groupBy(lambda x: x['time']).mapValues(lambda x: x['points']).mean()

The grouping part works as expected but I can't get it to return the mean part. The above function fails and I've tried a few iterations. Don't find the error messages helpful.

The real data file is much larger, with more keys, hence pyspark.

CodePudding user response:

You could use the agg function available for dataframes:

from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.appName('test').getOrCreate()
df = spark.read.json("datafile.json")
df = df.groupBy('time').agg(F.avg(F.col('points')).alias('avgPoints'))
df.show()


 ---- ---------                                                                 
|time|avgPoints|
 ---- --------- 
|   3|      4.0|
|   5|      1.0|
 ---- --------- 

Note that if you wanted to you can convert the dataframe to an rdd and continue:

rdd = df.rdd
print(rdd.collect())

# [Row(time=3, avgPoints=4.0), Row(time=5, avgPoints=1.0)]

But here is a solution using rdd only:

from pyspark import SparkContext
import json

sc = SparkContext.getOrCreate()
dataset_json = sc.textFile("datafile.json")
dataset = dataset_json.map(lambda x: json.loads(x))

def points_by_hour(dataset):
    return dataset.map(lambda x: (x['time'], (x['points'], 1))) \
        .reduceByKey(lambda acc, val: (acc[0]   val[0], acc[1]   val[1])) \
        .map(lambda x: (x[0], x[1][0]/x[1][1]))

print(points_by_hour(dataset).collect())

# [(3, 4.0), (5, 1.0)]

Explanation: Initialize a count of 1 with map, then sum up the points and count value with reduceByKey, then calculate the average using sum/count with a final map.

CodePudding user response:

Searching further, I found reduceByKey is more appropriate here.

import json
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

dataset_json = sc.textFile("datafile.json")
dataset = dataset_json.map(lambda x: json.loads(x))

def points_by_hour(dataset):
    return dataset.map(lambda x: (x['time'], x['points'])).reduceByKey(
    lambda a, b: (a   b)/2)

points_by_hour(dataset).collect()

#[(3, 4.0), (5, 1)]

The average of any two numbers is their sum, divided by 2, resulting in the simple lambda function.

  • Related