I have the following dataframe:
------------------- ------------------- --------- -------------- --------
| fs_date| ss_date|fs_origin|fs_destination| price|
------------------- ------------------- --------- -------------- --------
|2022-06-01T00:00:00|2022-06-02T00:00:00| TLV| AUH|681.0715|
|2022-06-01T00:00:00|2022-06-03T00:00:00| TLV| AUH| 406.46|
|2022-06-01T00:00:00|2022-06-02T00:00:00| TLV| BOM|545.7715|
|2022-06-01T00:00:00|2022-06-03T00:00:00| TLV| BOM| 372.435|
I want to collect the whole dataframe into a list of dictionaries, partitiond by 'fs_destination' like so:
{ "AUH":
[{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":681.0715},
{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":406.46}],
"BOM":
[{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":545.7715},
{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":372.435}]
}
I don't want it to be in JSON, only regular python dictionary. Thanks!
CodePudding user response:
We can create json strings within pyspark, and then use json.loads()
to convert them to dictionary.
# create a list of destinations available in the data
fs_dest_list = data_sdf.select('fs_destination').distinct().rdd. \
map(lambda x: x.fs_destination). \
collect()
# create a dictionary style string -- `to_json()` returns a string and so will the `collect()`
fs_dict_str = data_sdf. \
withColumn('all_col_json_str', func.to_json(func.struct(*[func.col(k) for k in data_sdf.columns]))). \
groupBy(func.lit(1).alias('gk')). \
pivot('fs_destination', values=fs_dest_list). \
agg(func.collect_list('all_col_json_str').alias('json_str')). \
withColumn('dest_json_str', func.to_json(func.struct(*[func.col(k) for k in fs_dest_list]))). \
select('dest_json_str'). \
collect()[0][0]
# '{"AUH":["{\"fs_date\":\"2022-06-01T00:00:00\",\"ss_date\":\"2022-06-02T00:00:00\",\"fs_origin\":\"TLV\",\"fs_destination\":\"AUH\",\"price\":681.0715}","{\"fs_date\":\"2022-06-01T00:00:00\",\"ss_date\":\"2022-06-03T00:00:00\",\"fs_origin\":\"TLV\",\"fs_destination\":\"AUH\",\"price\":406.46}"],
# "BOM":["{\"fs_date\":\"2022-06-01T00:00:00\",\"ss_date\":\"2022-06-02T00:00:00\",\"fs_origin\":\"TLV\",\"fs_destination\":\"BOM\",\"price\":545.7715}","{\"fs_date\":\"2022-06-01T00:00:00\",\"ss_date\":\"2022-06-03T00:00:00\",\"fs_origin\":\"TLV\",\"fs_destination\":\"BOM\",\"price\":372.435}"]}'
Use json.loads()
to create a dictionary from the json string
fs_dict = json.loads(fs_dict_str)
# result
# {'AUH': ['{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":681.0715}',
# '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":406.46}'],
# 'BOM': ['{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":545.7715}',
# '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":372.435}']}
It is still not enough as the dictionaries within the list are still json strings as a result of the to_json
. Use a for loop on the dictionary keys to create dictionaries out of them using json.loads
.
for k in fs_dict.keys():
fs_dict[k] = [json.loads(s) for s in fs_dict[k]]
# {'AUH': [{'fs_date': '2022-06-01T00:00:00',
# 'fs_destination': 'AUH',
# 'fs_origin': 'TLV',
# 'price': 681.0715,
# 'ss_date': '2022-06-02T00:00:00'},
# {'fs_date': '2022-06-01T00:00:00',
# 'fs_destination': 'AUH',
# 'fs_origin': 'TLV',
# 'price': 406.46,
# 'ss_date': '2022-06-03T00:00:00'}],
# 'BOM': [{'fs_date': '2022-06-01T00:00:00',
# 'fs_destination': 'BOM',
# 'fs_origin': 'TLV',
# 'price': 545.7715,
# 'ss_date': '2022-06-02T00:00:00'},
# {'fs_date': '2022-06-01T00:00:00',
# 'fs_destination': 'BOM',
# 'fs_origin': 'TLV',
# 'price': 372.435,
# 'ss_date': '2022-06-03T00:00:00'}]}
type(fs_dict)
# dict
for k in fs_dict.keys():
print(type(fs_dict[k]))
# <class 'list'>
# <class 'list'>
for k in fs_dict.keys():
print([type(ele) for ele in fs_dict[k]])
# [<class 'dict'>, <class 'dict'>]
# [<class 'dict'>, <class 'dict'>]
CodePudding user response:
Thanks for your answers, I found this solution (mentiond in the comments) to be the best for me:
replace the last row from this answer: Collect pyspark dataframe into list by value
with this row: {row["fs_destination"]: list(map(lambda x: json.loads(x), row["JSON"])) for row in df_output.toLocalIterator()}