Home > Back-end >  Collect pyspark dataframe into list of dictionaries by value
Collect pyspark dataframe into list of dictionaries by value

Time:08-04

I have the following dataframe:

 ------------------- ------------------- --------- -------------- -------- 
|            fs_date|            ss_date|fs_origin|fs_destination|   price|
 ------------------- ------------------- --------- -------------- -------- 
|2022-06-01T00:00:00|2022-06-02T00:00:00|      TLV|           AUH|681.0715|
|2022-06-01T00:00:00|2022-06-03T00:00:00|      TLV|           AUH|  406.46|
|2022-06-01T00:00:00|2022-06-02T00:00:00|      TLV|           BOM|545.7715|
|2022-06-01T00:00:00|2022-06-03T00:00:00|      TLV|           BOM| 372.435|

I want to collect the whole dataframe into a list of dictionaries, partitiond by 'fs_destination' like so:

{ "AUH":
  [{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":681.0715},
   {"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":406.46}],
"BOM":
  [{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":545.7715}, 
   {"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":372.435}]
}

I don't want it to be in JSON, only regular python dictionary. Thanks!

CodePudding user response:

We can create json strings within pyspark, and then use json.loads() to convert them to dictionary.

# create a list of destinations available in the data
fs_dest_list = data_sdf.select('fs_destination').distinct().rdd. \
    map(lambda x: x.fs_destination). \
    collect()

# create a dictionary style string -- `to_json()` returns a string and so will the `collect()`
fs_dict_str = data_sdf. \
    withColumn('all_col_json_str', func.to_json(func.struct(*[func.col(k) for k in data_sdf.columns]))). \
    groupBy(func.lit(1).alias('gk')). \
    pivot('fs_destination', values=fs_dest_list). \
    agg(func.collect_list('all_col_json_str').alias('json_str')). \
    withColumn('dest_json_str', func.to_json(func.struct(*[func.col(k) for k in fs_dest_list]))). \
    select('dest_json_str'). \
    collect()[0][0]

# '{"AUH":["{\"fs_date\":\"2022-06-01T00:00:00\",\"ss_date\":\"2022-06-02T00:00:00\",\"fs_origin\":\"TLV\",\"fs_destination\":\"AUH\",\"price\":681.0715}","{\"fs_date\":\"2022-06-01T00:00:00\",\"ss_date\":\"2022-06-03T00:00:00\",\"fs_origin\":\"TLV\",\"fs_destination\":\"AUH\",\"price\":406.46}"],
#  "BOM":["{\"fs_date\":\"2022-06-01T00:00:00\",\"ss_date\":\"2022-06-02T00:00:00\",\"fs_origin\":\"TLV\",\"fs_destination\":\"BOM\",\"price\":545.7715}","{\"fs_date\":\"2022-06-01T00:00:00\",\"ss_date\":\"2022-06-03T00:00:00\",\"fs_origin\":\"TLV\",\"fs_destination\":\"BOM\",\"price\":372.435}"]}'

Use json.loads() to create a dictionary from the json string

fs_dict = json.loads(fs_dict_str)

# result
# {'AUH': ['{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":681.0715}',
#   '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":406.46}'],
#  'BOM': ['{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":545.7715}',
#   '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":372.435}']}

It is still not enough as the dictionaries within the list are still json strings as a result of the to_json. Use a for loop on the dictionary keys to create dictionaries out of them using json.loads.

for k in fs_dict.keys():
    fs_dict[k] = [json.loads(s) for s in fs_dict[k]]

# {'AUH': [{'fs_date': '2022-06-01T00:00:00',
#    'fs_destination': 'AUH',
#    'fs_origin': 'TLV',
#    'price': 681.0715,
#    'ss_date': '2022-06-02T00:00:00'},
#   {'fs_date': '2022-06-01T00:00:00',
#    'fs_destination': 'AUH',
#    'fs_origin': 'TLV',
#    'price': 406.46,
#    'ss_date': '2022-06-03T00:00:00'}],
#  'BOM': [{'fs_date': '2022-06-01T00:00:00',
#    'fs_destination': 'BOM',
#    'fs_origin': 'TLV',
#    'price': 545.7715,
#    'ss_date': '2022-06-02T00:00:00'},
#   {'fs_date': '2022-06-01T00:00:00',
#    'fs_destination': 'BOM',
#    'fs_origin': 'TLV',
#    'price': 372.435,
#    'ss_date': '2022-06-03T00:00:00'}]}

type(fs_dict)
# dict

for k in fs_dict.keys():
    print(type(fs_dict[k]))
# <class 'list'>
# <class 'list'>

for k in fs_dict.keys():
    print([type(ele) for ele in fs_dict[k]])
# [<class 'dict'>, <class 'dict'>]
# [<class 'dict'>, <class 'dict'>]

CodePudding user response:

Thanks for your answers, I found this solution (mentiond in the comments) to be the best for me:

replace the last row from this answer: Collect pyspark dataframe into list by value

with this row: {row["fs_destination"]: list(map(lambda x: json.loads(x), row["JSON"])) for row in df_output.toLocalIterator()}

  • Related