Home > Back-end >  extract value from a list of json in pyspark
extract value from a list of json in pyspark

Time:09-22

I have a dataframe where a column is in the form of a list of json. I want to extract a specific value (score) from the column and create independent columns.

raw_data = [{"user_id" : 1234, "col" : [{"id":14577120145280,"score":64.71,"Elastic_position":0},{"id":14568530280240,"score":88.53,"Elastic_position":1},{"id":14568530119661,"score":63.75,"Elastic_position":2},{"id":14568530205858,"score":62.79,"Elastic_position":3},{"id":14568530414899,"score":60.88,"Elastic_position":4}]}]

df = pd.DataFrame.from_dict(raw_data)

I want to explode my result dataframe as:

enter image description here

CodePudding user response:

Assuming you have your json looks like this

# a.json
# {
#     "user_id" : 1234,
#     "col" : [
#         {"id":14577120145280,"score":64.71,"Elastic_position":0},
#         {"id":14568530280240,"score":88.53,"Elastic_position":1},
#         {"id":14568530119661,"score":63.75,"Elastic_position":2},
#         {"id":14568530205858,"score":62.79,"Elastic_position":3},
#         {"id":14568530414899,"score":60.88,"Elastic_position":4}
#     ]
# }

You can read it, flatten it, then pivot it like so

schema = T.StructType([
    T.StructField('user_id', T.IntegerType()),
    T.StructField('col', T.ArrayType(T.StructType([
        T.StructField('id', T.LongType()),
        T.StructField('score', T.DoubleType()),
        T.StructField('Elastic_position', T.IntegerType()),
    ]))),
])

df = spark.read.json('a.json', multiLine=True, schema=schema)
df.show(10, False)
#  ------- -------------------------------------------------------------------------------------------------------------------------------------------- 
# |user_id|col                                                                                                                                         |
#  ------- -------------------------------------------------------------------------------------------------------------------------------------------- 
# |1234   |[{14577120145280, 64.71, 0}, {14568530280240, 88.53, 1}, {14568530119661, 63.75, 2}, {14568530205858, 62.79, 3}, {14568530414899, 60.88, 4}]|
#  ------- -------------------------------------------------------------------------------------------------------------------------------------------- 


df.printSchema()
# root
#  |-- user_id: integer (nullable = true)
#  |-- col: array (nullable = true)
#  |    |-- element: struct (containsNull = true)
#  |    |    |-- id: long (nullable = true)
#  |    |    |-- score: double (nullable = true)
#  |    |    |-- Elastic_position: integer (nullable = true)

(df
    .select('user_id', F.explode('col'))
    .groupBy('user_id')
    .pivot('col.Elastic_position')
    .agg(F.first('col.score'))
    .show(10, False)
)

# Output
#  ------- ----- ----- ----- ----- ----- 
# |user_id|0    |1    |2    |3    |4    |
#  ------- ----- ----- ----- ----- ----- 
# |1234   |64.71|88.53|63.75|62.79|60.88|
#  ------- ----- ----- ----- ----- ----- 

CodePudding user response:

Try using pd.Series.explode with groupby:

df = pd.DataFrame.from_dict(raw_data).explode('col')
df.assign(col=df['col'].str['score']).groupby('user_id').agg(list).apply(lambda x: (y:=x.explode()).set_axis(y.index   '_'   y.groupby(level=0).cumcount().astype(str)), axis=1).reset_index()

   user_id  col_0  col_1  col_2  col_3  col_4
0     1234  64.71  88.53  63.75  62.79  60.88

If firstly constructs a dataframe and explode the col column, then groups by the duplicated user_ids and performs another explode to make it long to wide, then adds the prefix 0 to 4 with cumcount.

  • Related