Home > Enterprise >  Collect pyspark dataframe into list by value
Collect pyspark dataframe into list by value

Time:07-26

I have the following dataframe:

 ------------------- ------------------- --------- -------------- -------- 
|            fs_date|            ss_date|fs_origin|fs_destination|   price|
 ------------------- ------------------- --------- -------------- -------- 
|2022-06-01T00:00:00|2022-06-02T00:00:00|      TLV|           AUH|681.0715|
|2022-06-01T00:00:00|2022-06-03T00:00:00|      TLV|           AUH|  406.46|
|2022-06-01T00:00:00|2022-06-02T00:00:00|      TLV|           BOM|545.7715|
|2022-06-01T00:00:00|2022-06-03T00:00:00|      TLV|           BOM| 372.435|

I want to collect the whole dataframe into a list of JSONs, partitiond by 'fs_destination' like so:

{ "AUH":
  ['{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":681.0715}',
   '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":406.46}'],
"BOM":
  ['{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":545.7715}', 
   '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":372.435}']
}

Thanks!

CodePudding user response:

Be careful when you collect data, be sure your cluster has capacity.

import pyspark.sql.functions as f


df_output = (df
             .groupBy("fs_destination")
             .agg(f.collect_list(f.to_json(f.struct(*df.columns))).alias("JSON")))

output = {row["fs_destination"]: row["JSON"] for row in df_output.toLocalIterator()}

Output

{
  'AUH': [
    '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":681.0715}',
    '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":406.46}'
  ],
  'BOM': [
    '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":545.7715}',
    '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":372.435}'
  ]
}

CodePudding user response:

try this:

import pyspark.sql.functions as f
df = (
    df
    .withColumn('jsonValue', f.to_json(f.struct(*df.columns)))
    .groupBy('fs_destination')
    .agg(f.collect_list('jsonValue').alias('jsonValues'))
)

df_collected = df.collect()

output = dict(zip(
    [element['fs_destination'] for element in df_collected],
    [element['jsonValues'] for element in df_collected]
))
  • Related