Home > Back-end >  Pyspark Json Struct
Pyspark Json Struct

Time:12-03

I am trying to create 1 json from a DF which has 3 entries for 1 Customer,


 ---------- --------------- --------- ----------------- ----------- --------------- --------- ----------------- -------------------- ------------------ ------ 
|CustomerId|EmailPreference|EmailType|AddressPreference|AddressType|PhonePreference|PhoneType|        attribute|                from|                to|action|
 ---------- --------------- --------- ----------------- ----------- --------------- --------- ----------------- -------------------- ------------------ ------ 
|C1000001|        Primary|     Home|             null|       null|           null|     null|     EmailAddress|[email protected]|[email protected]|UPDATE|
|C1000001|           null|     null|             null|       null|        Primary|     Home|      PhoneNumber|          8177777777|        8168888888|UPDATE|
|C1000001|           null|     null|             null|       null|        Primary|     Home|FormatPhoneNumber|       (816)777-7777|     (816)888-8888|UPDATE|
 ---------- --------------- --------- ----------------- ----------- --------------- --------- ----------------- -------------------- ------------------ ------ 

This is our updatesDF. So I am trying to create a struct out of this in such a way that we need 1 json entry for 1 Customer. So here for 1 customer, there were 3 updates. So this is what I had tried,

json_df = updatesDF.select(
      F.col("CustomerId").alias("CustomerId"),
   #   "action",
      "PhonePreference",
      "EmailPreference",
      
      F.struct(
        F.col("PhoneType"),
        F.col("PhonePreference"),
        F.col("Attribute"),
        F.col("From"),
        F.col("To"),
      ).alias("PhoneDetails"),

      F.struct(
        F.col("EmailType"),
        F.col("EmailPreference"),  
        F.col("Attribute"),
        F.col("From"),
        F.col("To"),
        ).alias("EmailDetails"),
    ).groupBy(
    "CustomerId",
      "PhonePreference",
    "EmailPreference",
    ).agg(
    F.collect_list("PhoneDetails").alias("PhoneDetails"),
    F.collect_list("EmailDetails").alias("EmailDetails"),)

So the problem with this code is, I am getting 2 jsons as output, I need one output for 1 Customer

Could you please help so that finally I get just 1 json with all email changes under emaildetails and all phone changes under phonedetails

CodePudding user response:

You get 2 rows since your groupBy has two unique combinations.

  • (C1000001, null, Primary)
  • (C1000001, Primary, null)

One workaround is that you only groupBy CustomerId and in the aggregation apply first("EmailPreference", ignorenulls=True) and first("PhonePreference", ignorenulls=True)

CodePudding user response:

You're half-way through it, all you have to do is groupby one more time with only CustomerID and add both PhoneDetails and EmailDetails into one other struct

    .select(
        'CustomerId',
        F.struct(
            F.col('PhoneDetails'),
            F.col('EmailDetails')
        ).alias('updates')
    )
    .groupBy('CustomerId')
    .agg(F.collect_list('updates').alias('updates'))
  • Related