How to convert multiple columns to maps after grouping and pivoting in pyspark. Code below is not working as expected.
Input dataframe:
ID | TYPE | Order | Address | Phone |
---|---|---|---|---|
1 | A | Primary | abc | 111 |
1 | B | Primary | def | 222 |
1 | A | Secondary | ghi | 333 |
1 | B | Secondary | jkl | 444 |
2 | A | Primary | mno | 555 |
2 | A | Secondary | pqr | 666 |
2 | B | Primary | stu | 777 |
2 | B | Secondary | vwy | 888 |
Expected Output dataframe:
ID | Primary_A_attributes | Primary_B_attributes | Secondary_A_attributes | Secondary_B_attributes |
---|---|---|---|---|
1 | {"Address" : "abc", "phone" : "111"} | {"Address" : "def", phone" : "222"} | {"Address" : "ghi", "phone" : "333"} | {"Address" : "jkl", phone" : "444"} |
2 | {"Address":"mno", "phone":"555" | {"Address" : "pqr", phone" : "666"} | {"Address":"stu", "phone":"777" | {"Address" : "vwy", phone" : "888"} |
code used:
df.withColumn("collection",F.upper(F.concat_ws('_attributes_','order','type')))\
.groupBy('id').pivot("collection").agg(F.create_map(F.lit("Address"),F.col("Address")\
F.lit("phone"),F.col("phone"))).display()
CodePudding user response:
You can first create the maps and then use first()
to fill the values in pivot.
data_sdf. \
withColumn('collection', func.lower(func.concat_ws('_attributes_', 'order', 'type'))). \
withColumn('adr_ph_struct',
func.array(*[func.struct(func.lit(k).alias('key'), func.col(k).alias('val')) for k in ['address', 'phone']])
). \
withColumn('adr_ph_map', func.map_from_entries('adr_ph_struct')). \
groupBy('id'). \
pivot('collection'). \
agg(func.first('adr_ph_map')). \
show(truncate=False)
# --- ------------------------------ ------------------------------ ------------------------------ ------------------------------
# |id |primary_attributes_a |primary_attributes_b |secondary_attributes_a |secondary_attributes_b |
# --- ------------------------------ ------------------------------ ------------------------------ ------------------------------
# |1 |{address -> abc, phone -> 111}|{address -> def, phone -> 222}|{address -> ghi, phone -> 333}|{address -> jkl, phone -> 444}|
# |2 |{address -> mno, phone -> 555}|{address -> stu, phone -> 777}|{address -> pqr, phone -> 666}|{address -> vwy, phone -> 888}|
# --- ------------------------------ ------------------------------ ------------------------------ ------------------------------
The data before the groupBy().pivot()
would look like this
# --- ---- --------- ------- ----- ---------------------- ------------------------------ ------------------------------
# |id |type|order |address|phone|collection |adr_ph_struct |adr_ph_map |
# --- ---- --------- ------- ----- ---------------------- ------------------------------ ------------------------------
# |1 |A |Primary |abc |111 |primary_attributes_a |[{address, abc}, {phone, 111}]|{address -> abc, phone -> 111}|
# |1 |B |Primary |def |222 |primary_attributes_b |[{address, def}, {phone, 222}]|{address -> def, phone -> 222}|
# |1 |A |Secondary|ghi |333 |secondary_attributes_a|[{address, ghi}, {phone, 333}]|{address -> ghi, phone -> 333}|
# |1 |B |Secondary|jkl |444 |secondary_attributes_b|[{address, jkl}, {phone, 444}]|{address -> jkl, phone -> 444}|
# |2 |A |Primary |mno |555 |primary_attributes_a |[{address, mno}, {phone, 555}]|{address -> mno, phone -> 555}|
# |2 |A |Secondary|pqr |666 |secondary_attributes_a|[{address, pqr}, {phone, 666}]|{address -> pqr, phone -> 666}|
# |2 |B |Primary |stu |777 |primary_attributes_b |[{address, stu}, {phone, 777}]|{address -> stu, phone -> 777}|
# |2 |B |Secondary|vwy |888 |secondary_attributes_b|[{address, vwy}, {phone, 888}]|{address -> vwy, phone -> 888}|
# --- ---- --------- ------- ----- ---------------------- ------------------------------ ------------------------------
CodePudding user response:
desired solution :
from pyspark.sql import functions as F
from pyspark.sql import Window as W
data =[("1","A","Primary","abc","111"),
("1","B","Primary","def","222"),
("1","A","Secondary","ghi","333"),
("1","B","Secondary","jkl","444"),
("2","A","Primary","mno","555"),
("2","A","Secondary","pqr","666"),
("2","B","Primary","stu","777"),
("2","B","Secondary","vwy","888")]
schema=["ID","TYPE","Order","Address","Phone"]
df_source = spark.createDataFrame(data,schema)
expr_array=F.array(F.lit("Address"),F.lit("Phone"))
df_fnl = df_source.withColumn("collection",F.concat_ws('_','order','type'))\
.groupBy(["id"]).pivot("collection").agg(F.map_from_arrays(expr_array,F.array_union(F.collect_list("Address"),F.collect_list("Phone"))))
df_fnl.select([F.col(col).alias(col "_attributes") if col !='id' else col for col in df_fnl.columns ]).show(10,0)
OUTPUT :
--- ------------------------------ ------------------------------ ------------------------------ ------------------------------
|id |Primary_A_attributes |Primary_B_attributes |Secondary_A_attributes |Secondary_B_attributes |
--- ------------------------------ ------------------------------ ------------------------------ ------------------------------
|1 |{Address -> abc, Phone -> 111}|{Address -> def, Phone -> 222}|{Address -> ghi, Phone -> 333}|{Address -> jkl, Phone -> 444}|
|2 |{Address -> mno, Phone -> 555}|{Address -> stu, Phone -> 777}|{Address -> pqr, Phone -> 666}|{Address -> vwy, Phone -> 888}|
--- ------------------------------ ------------------------------ ------------------------------ ------------------------------
kindly UPVOTE IF YOU LIKE MY SOLUTION .