I have this data frame.
uid status count amount
14 success 1 50
14 failed 2 60
This is the output I want
uid result
14 {"successful_count": 1, "successful_amount": 50, "failure_count": 2,
"failure_amount": 60}
I have tried this so far.
schema = StructType([ \
StructField("successful_amount",DoubleType(),True), \
StructField("successful_count",IntegerType(),True), \
StructField("failure_amount",DoubleType(),True), \
StructField("failure_count",IntegerType(),True)
])
df_amount.withColumn("res", when(col("transaction_status") == "success", to_json(struct(col("amount").alias("successful_amount"), col("count").alias("successful_count"))))\
.when(col("transaction_status") == "failed", to_json(struct(col("amount").alias("failed_amount"), col("count").alias("failed_count")))))\
.withColumn("res", from_json(col("per_method"), schema))
This has added one more struct column 'res' but now I am not sure how to groupby and combine two struct/JSON into one object. Do I need to write a custom aggregate function? Is there any easier/better way to get the output?
CodePudding user response:
Option 1:
This approach uses an aggregate function to group them together (considering their uid), and an udf function to get the dictionary you are looking for:
from pyspark.sql.functions import udf, collect_list
# First, group the pertinent columns depending on the uid
grouped_df = df_amount.groupby('uid').agg(collect_list('status').alias("name"), collect_list('count').alias("count"), collect_list('amount').alias("amount"))
# resulting on a pyspark dataframe with each possible value in the same row
grouped_df.show()
This is the resulting pyspark dataframe:
--- ----------------- ------ --------
|uid| name| count| amount|
--- ----------------- ------ --------
| 14|[success, failed]|[1, 2]|[50, 60]|
--- ----------------- ------ --------
After that, create the column containing the dictionary you are looking for:
# Create our customize function to create the dictionary using the values of each column (list)
@udf(StringType())
def new_column(name, count, amount):
nr = dict()
for i in range(len(name)):
nr[name[i] "_amount"] = amount[i]
nr[name[i] "_count"] = count[i]
return nr
result = grouped_df.withColumn("result", new_column(grouped_df['name'], grouped_df['count'], grouped_df['amount'])).select("uid", "result")
result.show(truncate = False)
The resulting data frame:
--- ----------------------------------------------------------------------
|uid|result |
--- ----------------------------------------------------------------------
|14 |{success_count=1, failed_count=2, success_amount=50, failed_amount=60}|
--- ----------------------------------------------------------------------
Option 2:
You could also create a dataframe joining the rows with the same uid and different status. After that, create a new pyspark dataframe with res column adapting it to the schema you are looking for:
# Filter the dataframe for the different values of the column status and join them together to get all the different values in the same row
# You can also rename the columns if necessary
df_amount_joined = df_amount.filter(col("status") == "success").withColumnRenamed("count", "successful_count").withColumnRenamed("amount", "successful_amount").join(df_amount.filter(col("status") == "failed").withColumnRenamed("count", "failure_count").withColumnRenamed("amount", "failure_amount"), on = "uid", how= "left").drop("status")
df_amount_joined.show()
Getting the resulting dataframe:
--- ---------------- ----------------- ------------- --------------
|uid|successful_count|successful_amount|failure_count|failure_amount|
--- ---------------- ----------------- ------------- --------------
| 14| 1| 50| 2| 60|
--- ---------------- ----------------- ------------- --------------
Create the last column using struct as you did in your example:
# Finally create the column as you did in your example
df_final = df_amount_joined.withColumn("res", to_json(struct(col("successful_count"), col("successful_amount"), col("failure_count"), col("failure_amount")))).select("uid", "res")
df_final.show(truncate = False)
That will leave you with the dataframe you were looking for:
--- -----------------------------------------------------------------------------------
|uid|res |
--- -----------------------------------------------------------------------------------
|14 |{"successful_count":1,"successful_amount":50,"failure_count":2,"failure_amount":60}|
--- -----------------------------------------------------------------------------------