I have the following dataframe:
------------------- ------------------- --------- -------------- --------
| fs_date| ss_date|fs_origin|fs_destination| price|
------------------- ------------------- --------- -------------- --------
|2022-06-01T00:00:00|2022-06-02T00:00:00| TLV| AUH|681.0715|
|2022-06-01T00:00:00|2022-06-03T00:00:00| TLV| AUH| 406.46|
|2022-06-01T00:00:00|2022-06-02T00:00:00| TLV| BOM|545.7715|
|2022-06-01T00:00:00|2022-06-03T00:00:00| TLV| BOM| 372.435|
I want to collect the whole dataframe into a list of JSONs, partitiond by 'fs_destination' like so:
{ "AUH":
['{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":681.0715}',
'{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":406.46}'],
"BOM":
['{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":545.7715}',
'{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":372.435}']
}
Thanks!
CodePudding user response:
Be careful when you collect data, be sure your cluster has capacity.
import pyspark.sql.functions as f
df_output = (df
.groupBy("fs_destination")
.agg(f.collect_list(f.to_json(f.struct(*df.columns))).alias("JSON")))
output = {row["fs_destination"]: row["JSON"] for row in df_output.toLocalIterator()}
Output
{
'AUH': [
'{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":681.0715}',
'{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":406.46}'
],
'BOM': [
'{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":545.7715}',
'{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":372.435}'
]
}
CodePudding user response:
try this:
import pyspark.sql.functions as f
df = (
df
.withColumn('jsonValue', f.to_json(f.struct(*df.columns)))
.groupBy('fs_destination')
.agg(f.collect_list('jsonValue').alias('jsonValues'))
)
df_collected = df.collect()
output = dict(zip(
[element['fs_destination'] for element in df_collected],
[element['jsonValues'] for element in df_collected]
))