I would like to join two dataframes based on currency rate exchange and date from the second dataframe. I have tried the approach cited here, but the datediff
gives the difference in dates so it doesn't give me the right rate.
df1:
from_curr | to_curr | Date | value_to_convert |
---|---|---|---|
AED | EUR | 2017-03-24 | 2000 |
AED | EUR | 2017-03-27 | 189 |
DZD | EUR | 2017-01-12 | 130 |
EUR | EUR | 2020-01-01 | 11 |
df2 (currency_table):
transacti | local | DateTra | rate_exchange |
---|---|---|---|
AED | EUR | 2017-03-24 | -5,123 |
AED | EUR | 2017-03-26 | -9.5 |
DZD | EUR | 2017-01-01 | -6,12 |
The output should look like this:
from_curr | to_curr | Date | value_to_convert | value_converted |
---|---|---|---|---|
AED | EUR | 2017-03-24 | 2000 | 390.39 |
AED | EUR | 2017-03-27 | 189 | 19.89 |
DZD | EUR | 2017-01-12 | 130 | 21.24 |
EUR | EUR | 2020-01-01 | 11 | 11 |
The only method that works is substracting the difference between the two dates "DATE" and "DATETra" and get the closest date to the "DATETra". Could you please propose another method much cleaner then substracting strings?
CodePudding user response:
You could aggregate your smaller dataframe (df2) in order to collect all the dates and rates into one cell. Then, join dataframes, take out what you need and do the division.
Inputs:
from pyspark.sql import functions as F
df1 = spark.createDataFrame(
[('AED', 'EUR', '2017-03-24', 2000),
('AED', 'EUR', '2017-03-27', 189),
('DZD', 'EUR', '2017-01-12', 130),
('EUR', 'EUR', '2020-01-01', 11)],
['from_curr', 'to_curr', 'Date', 'value_to_convert'])
df2 = spark.createDataFrame(
[('AED', 'EUR', '2017-03-24', -5.123),
('AED', 'EUR', '2017-03-26', -9.5),
('DZD', 'EUR', '2017-01-01', -6.12)],
['transacti', 'local', 'DateTra', 'rate_exchange'])
Script which gets the closest day's rate (could be from future):
df2 = df2.groupBy('transacti', 'local').agg(
F.collect_list(F.struct('DateTra', 'rate_exchange')).alias('_vals')
)
rate = F.array_sort(F.transform(
'_vals',
lambda x: F.struct(
F.abs(F.datediff('Date', x.DateTra)).alias('diff'),
-F.unix_timestamp(x.DateTra, 'yyyy-MM-dd').alias('DateTra'),
F.abs(x.rate_exchange).alias('rate_exchange')
)
))[0]['rate_exchange']
df = (df1
.join(df2, (df1.from_curr == df2.transacti) & (df1.to_curr == df2.local), 'left')
.select(
df1['*'],
F.coalesce(
F.col('value_to_convert') / rate,
F.when(df1.from_curr == df1.to_curr, df1.value_to_convert)
).alias('value_converted')
)
)
df.show()
# --------- ------- ---------- ---------------- ------------------
# |from_curr|to_curr| Date|value_to_convert| value_converted|
# --------- ------- ---------- ---------------- ------------------
# | AED| EUR|2017-03-24| 2000| 390.3962521959789|
# | AED| EUR|2017-03-27| 189|19.894736842105264|
# | EUR| EUR|2020-01-01| 11| 11.0|
# | DZD| EUR|2017-01-12| 130|21.241830065359476|
# --------- ------- ---------- ---------------- ------------------
Script which gets the most recent rate, but not from future:
df2 = df2.groupBy('transacti', 'local').agg(
F.sort_array(F.collect_list(F.struct('DateTra', 'rate_exchange')), False).alias('_vals')
)
rate = F.abs(F.filter('_vals', lambda x: x.DateTra <= F.col('Date'))[0]['rate_exchange'])
df = (df1
.join(df2, (df1.from_curr == df2.transacti) & (df1.to_curr == df2.local), 'left')
.select(
df1['*'],
F.coalesce(
F.col('value_to_convert') / rate,
F.when(df1.from_curr == df1.to_curr, df1.value_to_convert)
).alias('value_converted')
)
)
df.show()
# --------- ------- ---------- ---------------- ------------------
# |from_curr|to_curr| Date|value_to_convert| value_converted|
# --------- ------- ---------- ---------------- ------------------
# | AED| EUR|2017-03-24| 2000| 390.3962521959789|
# | AED| EUR|2017-03-27| 189|19.894736842105264|
# | EUR| EUR|2020-01-01| 11| 11.0|
# | DZD| EUR|2017-01-12| 130|21.241830065359476|
# --------- ------- ---------- ---------------- ------------------