Home > Net >  Create new rows based on aggregate of child transactions in pyspark
Create new rows based on aggregate of child transactions in pyspark

Time:03-16

I have a data frame in pyspark like below

df = spark.createDataFrame(
[
("ABC",1809,'100.00'),
("ABC",851,'89.00'),
("ABC",852, '10.00'),
("BBC",1810,'10.00'),
("BBC",951,'8.90'),
("BBC",852, '1.00'),
("XXX",1810,'10.00'),
("XXX",951,'9.00'),
("XXX",852, '1.00')], 
("EMC","TRAN_CODES", "amount"))

df.show()

 --- ---- ------ 
|EMC|TRAN|amount|
 --- ---- ------ 
|ABC|1809|100.00|
|ABC| 851| 89.00|
|ABC| 852| 10.00|
|BBC|1810| 10.00|
|BBC| 951|  8.90|
|BBC| 852|  1.00|
|XXX|1810| 10.00|
|XXX| 951|  9.00|
|XXX| 852|  1.00|
 --- ---- ------ 

Rules:
1) 1809 and 1810 are parent tran codes 2) 851 and 852 are child codes associated with 1809 3) 951 and 852 are child codes associated with 1810

A Child code can be assoiciated with many parent codes.

I want to do below

1) Find agg amount of child codes based on `EMC`.
2) Check if the agg child amount is equal to parent code value
3) if it is equal then do nothing
4) if it is not equal then add the difference as new records with Tran_codes as `999`

expected final data frame

 --- ---- ------ 
|EMC|TRAN|amount|
 --- ---- ------ 
|ABC|1809|100.00|
|ABC| 851| 89.00|
|ABC| 852| 10.00|
|BBC|1810| 10.00|
|BBC| 951|  8.90|
|BBC| 852|  1.00|
|XXX|1810| 10.00|
|XXX| 951|  9.00|
|XXX| 852|  1.00|
|ABC| 999|  1.00|
|BBC| 999|  0.10|
 --- ---- ------ 

I am not able to figure out what I should do

CodePudding user response:

differences = (
    df.groupBy("EMC")
      .agg(
         F.sum(F.when(F.col("TRAN") > 1000, F.col("amount"))
             .otherwise(-F.col("amount")).alias("amount"))
      .select(F.col("EMC"), F.lit(999).alias("TRAN"), F.col("amount"))
    )

result = df.unionByName(differences)

I've used the condition F.col("TRAN") > 1000 to know if its a parent or a child, change it if you need another logic.

  • Related