I have two dataframes df1 and a a separate dataframe for USD exchange_ratedf2:
#df1#
from_curr | to_curr | Date | value_to_convert |
---|---|---|---|
AED | EUR | 2017-01-12 | 2000 |
AED | EUR | 2018-03-20 | 189 |
UAD | EUR | 2021-05-18 | 12.5 |
DZD | EUR | 2017-01-12 | 130 |
SEK | EUR | 2017-01-12 | 1000 |
GNF | EUR | 2017-08-03 | 1300 |
df2: #currency_table#
from_curr | To_curr | Date | rate_exchange |
---|---|---|---|
AED | EUR | 2017-01-01 | -5,123 |
UAD | EUR | 2021-05-26 | -9.5 |
AED | EUR | 2018-03-10 | -5,3 |
DZD | EUR | 2017-01-01 | -6,12 |
GNF | EUR | 2017-08-01 | -7,03 |
SEK | EUR | 2017-01-29 | -12 |
I would like to create a Pyspark function that convert value_to_convert from df1 using the exchange_rate from currency_table (by looking in the exchange_rate dataframe corresponding to the date group from currency ) while joining both dataframes on from_curr field and date field, each value should be converted with rate_exchange from the right date to get df3 like
from_curr | to_curr | Date | value_to_convert | converted_value |
---|---|---|---|---|
AED | EUR | 2017-01-12 | 2000 | 390 |
AED | EUR | 2018-03-20 | 189 | 35,66 |
UAD | EUR | 2021-05-18 | 12.5 | 1,31 |
DZD | EUR | 2017-01-12 | 130 | 21,24 |
SEK | EUR | 2017-01-12 | 1000 | 83,33 |
GNF | EUR | 2017-08-03 | 1300 | 184,92 |
converted_value=(value_to_convert)/(|rate_exchange|) Do you have any idea please?
CodePudding user response:
Here's a dirty way of doing it with a join()
.
data_sdf = spark.sparkContext.parallelize(data_ls). \
toDF(['from_curr', 'to_curr', 'dt', 'val_to_convert']). \
withColumn('dt', func.col('dt').cast('date'))
# --------- ------- ---------- --------------
# |from_curr|to_curr| dt|val_to_convert|
# --------- ------- ---------- --------------
# | AED| EUR|2017-01-12| 2000.0|
# | AED| EUR|2018-03-20| 189.0|
# | UAD| EUR|2021-05-18| 12.5|
# | DZD| EUR|2017-01-12| 130.0|
# | SEK| EUR|2017-01-12| 1000.0|
# | GNF| EUR|2017-08-03| 1300.0|
# --------- ------- ---------- --------------
curr_sdf = spark.sparkContext.parallelize(curr_ls). \
toDF(['from_curr', 'to_curr', 'dt', 'rate_exchange']). \
withColumn('dt', func.col('dt').cast('date')). \
withColumnRenamed('dt', 'from_curr_start_dt')
# --------- ------- ------------------ -------------
# |from_curr|to_curr|from_curr_start_dt|rate_exchange|
# --------- ------- ------------------ -------------
# | AED| EUR| 2017-01-01| -5.123|
# | UAD| EUR| 2021-05-26| -9.5|
# | AED| EUR| 2018-03-10| -5.3|
# | DZD| EUR| 2017-01-01| -6.12|
# | GNF| EUR| 2017-08-01| -7.03|
# | SEK| EUR| 2017-01-29| -12.0|
# --------- ------- ------------------ -------------
We join the the two dataframes on currencies (not on dates), so all exchange dates are mapped to the currencies. From there, we retain the exchange date closest to the currency date using datediff()
.
data_sdf. \
join(curr_sdf, ['from_curr', 'to_curr'], 'left'). \
withColumn('dt_diff', func.abs(func.datediff('dt', 'from_curr_start_dt'))). \
withColumn('min_dt_diff', func.min('dt_diff').over(wd.partitionBy('from_curr', 'dt'))). \
filter(func.col('dt_diff') == func.col('min_dt_diff')). \
withColumn('converted_value', func.col('val_to_convert') / func.abs(func.col('rate_exchange'))). \
drop('dt_diff', 'min_dt_diff'). \
show()
# --------- ------- ---------- -------------- ------------------ ------------- ------------------
# |from_curr|to_curr| dt|val_to_convert|from_curr_start_dt|rate_exchange| converted_value|
# --------- ------- ---------- -------------- ------------------ ------------- ------------------
# | GNF| EUR|2017-08-03| 1300.0| 2017-08-01| -7.03| 184.9217638691323|
# | AED| EUR|2017-01-12| 2000.0| 2017-01-01| -5.123| 390.3962521959789|
# | DZD| EUR|2017-01-12| 130.0| 2017-01-01| -6.12|21.241830065359476|
# | AED| EUR|2018-03-20| 189.0| 2018-03-10| -5.3|35.660377358490564|
# | UAD| EUR|2021-05-18| 12.5| 2021-05-26| -9.5|1.3157894736842106|
# | SEK| EUR|2017-01-12| 1000.0| 2017-01-29| -12.0| 83.33333333333333|
# --------- ------- ---------- -------------- ------------------ ------------- ------------------