Home > Net >  Pyspark how to convert columns to maps after grouping and pivoting
Pyspark how to convert columns to maps after grouping and pivoting

Time:08-11

How to convert multiple columns to maps after grouping and pivoting in pyspark. Code below is not working as expected.

Input dataframe:

ID TYPE Order Address Phone
1 A Primary abc 111
1 B Primary def 222
1 A Secondary ghi 333
1 B Secondary jkl 444
2 A Primary mno 555
2 A Secondary pqr 666
2 B Primary stu 777
2 B Secondary vwy 888

Expected Output dataframe:

ID Primary_A_attributes Primary_B_attributes Secondary_A_attributes Secondary_B_attributes
1 {"Address" : "abc", "phone" : "111"} {"Address" : "def", phone" : "222"} {"Address" : "ghi", "phone" : "333"} {"Address" : "jkl", phone" : "444"}
2 {"Address":"mno", "phone":"555" {"Address" : "pqr", phone" : "666"} {"Address":"stu", "phone":"777" {"Address" : "vwy", phone" : "888"}

code used:

df.withColumn("collection",F.upper(F.concat_ws('_attributes_','order','type')))\
.groupBy('id').pivot("collection").agg(F.create_map(F.lit("Address"),F.col("Address")\
F.lit("phone"),F.col("phone"))).display()

CodePudding user response:

You can first create the maps and then use first() to fill the values in pivot.

data_sdf. \
    withColumn('collection', func.lower(func.concat_ws('_attributes_', 'order', 'type'))). \
    withColumn('adr_ph_struct', 
               func.array(*[func.struct(func.lit(k).alias('key'), func.col(k).alias('val')) for k in ['address', 'phone']])
               ). \
    withColumn('adr_ph_map', func.map_from_entries('adr_ph_struct')). \
    groupBy('id'). \
    pivot('collection'). \
    agg(func.first('adr_ph_map')). \
    show(truncate=False)

#  --- ------------------------------ ------------------------------ ------------------------------ ------------------------------ 
# |id |primary_attributes_a          |primary_attributes_b          |secondary_attributes_a        |secondary_attributes_b        |
#  --- ------------------------------ ------------------------------ ------------------------------ ------------------------------ 
# |1  |{address -> abc, phone -> 111}|{address -> def, phone -> 222}|{address -> ghi, phone -> 333}|{address -> jkl, phone -> 444}|
# |2  |{address -> mno, phone -> 555}|{address -> stu, phone -> 777}|{address -> pqr, phone -> 666}|{address -> vwy, phone -> 888}|
#  --- ------------------------------ ------------------------------ ------------------------------ ------------------------------ 

The data before the groupBy().pivot() would look like this

#  --- ---- --------- ------- ----- ---------------------- ------------------------------ ------------------------------ 
# |id |type|order    |address|phone|collection            |adr_ph_struct                 |adr_ph_map                    |
#  --- ---- --------- ------- ----- ---------------------- ------------------------------ ------------------------------ 
# |1  |A   |Primary  |abc    |111  |primary_attributes_a  |[{address, abc}, {phone, 111}]|{address -> abc, phone -> 111}|
# |1  |B   |Primary  |def    |222  |primary_attributes_b  |[{address, def}, {phone, 222}]|{address -> def, phone -> 222}|
# |1  |A   |Secondary|ghi    |333  |secondary_attributes_a|[{address, ghi}, {phone, 333}]|{address -> ghi, phone -> 333}|
# |1  |B   |Secondary|jkl    |444  |secondary_attributes_b|[{address, jkl}, {phone, 444}]|{address -> jkl, phone -> 444}|
# |2  |A   |Primary  |mno    |555  |primary_attributes_a  |[{address, mno}, {phone, 555}]|{address -> mno, phone -> 555}|
# |2  |A   |Secondary|pqr    |666  |secondary_attributes_a|[{address, pqr}, {phone, 666}]|{address -> pqr, phone -> 666}|
# |2  |B   |Primary  |stu    |777  |primary_attributes_b  |[{address, stu}, {phone, 777}]|{address -> stu, phone -> 777}|
# |2  |B   |Secondary|vwy    |888  |secondary_attributes_b|[{address, vwy}, {phone, 888}]|{address -> vwy, phone -> 888}|
#  --- ---- --------- ------- ----- ---------------------- ------------------------------ ------------------------------ 

CodePudding user response:

desired solution :

from pyspark.sql import functions as F
from pyspark.sql import Window as W
data =[("1","A","Primary","abc","111"),
("1","B","Primary","def","222"),
("1","A","Secondary","ghi","333"),
("1","B","Secondary","jkl","444"),
("2","A","Primary","mno","555"),
("2","A","Secondary","pqr","666"),
("2","B","Primary","stu","777"),
("2","B","Secondary","vwy","888")]
schema=["ID","TYPE","Order","Address","Phone"]
df_source = spark.createDataFrame(data,schema)
expr_array=F.array(F.lit("Address"),F.lit("Phone"))
df_fnl = df_source.withColumn("collection",F.concat_ws('_','order','type'))\
.groupBy(["id"]).pivot("collection").agg(F.map_from_arrays(expr_array,F.array_union(F.collect_list("Address"),F.collect_list("Phone"))))

df_fnl.select([F.col(col).alias(col "_attributes") if col !='id' else col for col in df_fnl.columns ]).show(10,0)

OUTPUT :
--- ------------------------------ ------------------------------ ------------------------------ ------------------------------ 
|id |Primary_A_attributes           |Primary_B_attributes           |Secondary_A_attributes         |Secondary_B_attributes         |
 --- ------------------------------ ------------------------------ ------------------------------ ------------------------------ 
|1  |{Address -> abc, Phone -> 111}|{Address -> def, Phone -> 222}|{Address -> ghi, Phone -> 333}|{Address -> jkl, Phone -> 444}|
|2  |{Address -> mno, Phone -> 555}|{Address -> stu, Phone -> 777}|{Address -> pqr, Phone -> 666}|{Address -> vwy, Phone -> 888}|
 --- ------------------------------ ------------------------------ ------------------------------ ------------------------------ 

kindly UPVOTE IF YOU LIKE MY SOLUTION .

  • Related