I have a data frame in pyspark
like below
df = spark.createDataFrame(
[
("ABC",1809,'100.00'),
("ABC",851,'89.00'),
("ABC",852, '10.00'),
("BBC",1810,'10.00'),
("BBC",951,'8.90'),
("BBC",852, '1.00'),
("XXX",1810,'10.00'),
("XXX",951,'9.00'),
("XXX",852, '1.00')],
("EMC","TRAN_CODES", "amount"))
df.show()
--- ---- ------
|EMC|TRAN|amount|
--- ---- ------
|ABC|1809|100.00|
|ABC| 851| 89.00|
|ABC| 852| 10.00|
|BBC|1810| 10.00|
|BBC| 951| 8.90|
|BBC| 852| 1.00|
|XXX|1810| 10.00|
|XXX| 951| 9.00|
|XXX| 852| 1.00|
--- ---- ------
Rules:
1) 1809 and 1810 are parent tran codes
2) 851 and 852 are child codes associated with 1809
3) 951 and 852 are child codes associated with 1810
A Child code can be assoiciated with many parent codes.
I want to do below
1) Find agg amount of child codes based on `EMC`.
2) Check if the agg child amount is equal to parent code value
3) if it is equal then do nothing
4) if it is not equal then add the difference as new records with Tran_codes as `999`
expected final data frame
--- ---- ------
|EMC|TRAN|amount|
--- ---- ------
|ABC|1809|100.00|
|ABC| 851| 89.00|
|ABC| 852| 10.00|
|BBC|1810| 10.00|
|BBC| 951| 8.90|
|BBC| 852| 1.00|
|XXX|1810| 10.00|
|XXX| 951| 9.00|
|XXX| 852| 1.00|
|ABC| 999| 1.00|
|BBC| 999| 0.10|
--- ---- ------
I am not able to figure out what I should do
CodePudding user response:
differences = (
df.groupBy("EMC")
.agg(
F.sum(F.when(F.col("TRAN") > 1000, F.col("amount"))
.otherwise(-F.col("amount")).alias("amount"))
.select(F.col("EMC"), F.lit(999).alias("TRAN"), F.col("amount"))
)
result = df.unionByName(differences)
I've used the condition F.col("TRAN") > 1000
to know if its a parent or a child, change it if you need another logic.