I have below 2 data frames and i would like to apply similar condition and return the values in pyspark data frames.
df1.show()
--- ------- --------
|id |tr_type|nominal |
--- ------- --------
|1 |K |2.0 |
|2 |ZW |7.0 |
|3 |V |12.5 |
|4 |VW |9.0 |
|5 |CI |5.0 |
--- ------- --------
One dimensional mapping: *abcefgh
------- ------------ ------------ -----------
|odm_id |return_value|odm_relation|input_value|
------- ------------ ------------ -----------
|abcefgh|B |EQ |K |
|abcefgh|B |EQ |ZW |
|abcefgh|S |EQ |V |
|abcefgh|S |EQ |VW |
|abcefgh|I |EQ |CI |
------- ------------ ------------ -----------
I need to apply below condition The nominal volume is negated when there is a sell transaction.
IF (tr_type, $abcefgh.) == 'S' THEN ; nominal = -nominal ;
The expected output:
--- ------- ------- -----------
|id |tr_type|nominal|nominal_new|
--- ------- ------- -----------
|1 |K |2.0 |2.0 |
|2 |ZW |7.0 |7.0 |
|3 |V |12.5 |-12.5 |
|4 |VW |9.0 |-9.0 |
|5 |CI |5.0 |5.0 |
--- ------- ------- -----------
CodePudding user response:
you could join
the 2 dataframes on tr_type == input_value
and use a when().otherwise()
to create the new column.
see example below using your samples
data_sdf. \
join(odm_sdf.selectExpr('return_value', 'input_value as tr_type').
dropDuplicates(),
['tr_type'],
'left'
). \
withColumn('nominal_new',
func.when(func.col('return_value') == 'S', func.col('nominal') * -1).
otherwise(func.col('nominal'))
). \
drop('return_value'). \
show()
# ------- --- ------- -----------
# |tr_type| id|nominal|nominal_new|
# ------- --- ------- -----------
# | K| 1| 2.0| 2.0|
# | CI| 5| 5.0| 5.0|
# | V| 3| 12.5| -12.5|
# | VW| 4| 9.0| -9.0|
# | ZW| 2| 7.0| 7.0|
# ------- --- ------- -----------