Home > database >  Pypsark groupby merge JSON into single object
Pypsark groupby merge JSON into single object

Time:04-20

I have this data frame.

uid status   count amount
14  success  1     50
14  failed   2     60

This is the output I want

uid   result
14    {"successful_count": 1, "successful_amount": 50, "failure_count": 2, 
      "failure_amount": 60}

I have tried this so far.

schema = StructType([ \
        StructField("successful_amount",DoubleType(),True), \
        StructField("successful_count",IntegerType(),True), \
        StructField("failure_amount",DoubleType(),True), \
        StructField("failure_count",IntegerType(),True)
      ])
    
df_amount.withColumn("res", when(col("transaction_status") == "success", to_json(struct(col("amount").alias("successful_amount"), col("count").alias("successful_count"))))\
                                  .when(col("transaction_status") == "failed", to_json(struct(col("amount").alias("failed_amount"), col("count").alias("failed_count")))))\
         .withColumn("res", from_json(col("per_method"), schema)) 

This has added one more struct column 'res' but now I am not sure how to groupby and combine two struct/JSON into one object. Do I need to write a custom aggregate function? Is there any easier/better way to get the output?

CodePudding user response:

Option 1:

This approach uses an aggregate function to group them together (considering their uid), and an udf function to get the dictionary you are looking for:

from pyspark.sql.functions import udf, collect_list

# First, group the pertinent columns depending on the uid
grouped_df = df_amount.groupby('uid').agg(collect_list('status').alias("name"), collect_list('count').alias("count"), collect_list('amount').alias("amount"))
# resulting on a pyspark dataframe with each possible value in the same row
grouped_df.show()

This is the resulting pyspark dataframe:

 --- ----------------- ------ -------- 
|uid|             name| count|  amount|
 --- ----------------- ------ -------- 
| 14|[success, failed]|[1, 2]|[50, 60]|
 --- ----------------- ------ -------- 

After that, create the column containing the dictionary you are looking for:

# Create our customize function to create the dictionary using the values of each column (list)
@udf(StringType())
def new_column(name, count, amount):
    nr = dict()
    for i in range(len(name)):        
        nr[name[i]   "_amount"] = amount[i]
        nr[name[i]   "_count"] = count[i]
    return nr

result = grouped_df.withColumn("result", new_column(grouped_df['name'], grouped_df['count'], grouped_df['amount'])).select("uid", "result")
result.show(truncate = False)

The resulting data frame:

 --- ---------------------------------------------------------------------- 
|uid|result                                                                |
 --- ---------------------------------------------------------------------- 
|14 |{success_count=1, failed_count=2, success_amount=50, failed_amount=60}|
 --- ---------------------------------------------------------------------- 

Option 2:

You could also create a dataframe joining the rows with the same uid and different status. After that, create a new pyspark dataframe with res column adapting it to the schema you are looking for:

# Filter the dataframe for the different values of the column status and join them together to get all the different values in the same row
# You can also rename the columns if necessary
df_amount_joined = df_amount.filter(col("status") == "success").withColumnRenamed("count", "successful_count").withColumnRenamed("amount", "successful_amount").join(df_amount.filter(col("status") == "failed").withColumnRenamed("count", "failure_count").withColumnRenamed("amount", "failure_amount"), on = "uid", how= "left").drop("status")
df_amount_joined.show()

Getting the resulting dataframe:

 --- ---------------- ----------------- ------------- -------------- 
|uid|successful_count|successful_amount|failure_count|failure_amount|
 --- ---------------- ----------------- ------------- -------------- 
| 14|               1|               50|            2|            60|
 --- ---------------- ----------------- ------------- -------------- 

Create the last column using struct as you did in your example:

# Finally create the column as you did in your example
df_final = df_amount_joined.withColumn("res", to_json(struct(col("successful_count"), col("successful_amount"), col("failure_count"), col("failure_amount")))).select("uid", "res")
df_final.show(truncate = False)

That will leave you with the dataframe you were looking for:

 --- ----------------------------------------------------------------------------------- 
|uid|res                                                                                |
 --- ----------------------------------------------------------------------------------- 
|14 |{"successful_count":1,"successful_amount":50,"failure_count":2,"failure_amount":60}|
 --- ----------------------------------------------------------------------------------- 
  • Related