I want to join two Spark dataframes that have millions of rows. Assume 'id' is the common column to both dataframes. Both also have 'date' column. However, the date in the two tables may not match. If a record in the first table does not have a matching date in the second table, for the 'value' column from the second table, the most recent observation should be taken. Therefore, I cannot join on 'id' and 'date'. I have created a sample dataframes below. What is optimal way to perform this given that the data size is huge?
import pandas as pd
a = pd.DataFrame({'id':[1,2,3,1,2,3,1,2,3, 1,2,3], 'date': ['2020-01-01', '2020-01-01', '2020-01-01', '2020-01-08', '2020-01-08', '2020-01-08', '2020-01-21', '2020-01-21', '2020-01-21', '2020-01-31', '2020-01-31', '2020-01-31']})
a = spark.createDataFrame(a)
b = pd.DataFrame({'id':[1,2,3,1,2,1,3,1,2], 'date': ['2019-12-25', '2019-12-25', '2019-12-25', '2020-01-08', '2020-01-08', '2020-01-21', '2020-01-21', '2020-01-31', '2020-01-31'], 'value': [0.1,0.2,0.3,1,2,10,30,0.1,0.2]})
b = spark.createDataFrame(b)
required_result = pd.DataFrame({'id':[1,2,3,1,2,3,1,2,3, 1,2,3], 'date': ['2020-01-01', '2020-01-01', '2020-01-01', '2020-01-08', '2020-01-08', '2020-01-08', '2020-01-21', '2020-01-21', '2020-01-21', '2020-01-31', '2020-01-31', '2020-01-31'],
'value': [0.1,0.2,0.3, 1,2,0.3,10, 2,30,0.1,0.2, 30]})
CodePudding user response:
You could join on id
and keep dates from the second dataframe which are equal to or lower than first dataframe's dates.
data1_sdf.join(data2_sdf.withColumnRenamed('date', 'date_b'),
[data1_sdf.id == data2_sdf.id,
data1_sdf.date >= func.col('date_b')],
'left'
). \
drop(data2_sdf.id). \
withColumn('dates_diff', func.datediff('date_b', 'date')). \
withColumn('max_dtdiff',
func.max('dates_diff').over(wd.partitionBy('id', 'date'))
). \
filter(func.col('max_dtdiff') == func.col('dates_diff')). \
drop('dates_diff', 'max_dtdiff'). \
orderBy('id', 'date'). \
show()
# --- ---------- ---------- -----
# | id| date| date_b|value|
# --- ---------- ---------- -----
# | 1|2020-01-01|2019-12-25| 0.1|
# | 1|2020-01-08|2020-01-08| 1.0|
# | 1|2020-01-21|2020-01-21| 10.0|
# | 1|2020-01-31|2020-01-31| 0.1|
# | 2|2020-01-01|2019-12-25| 0.2|
# | 2|2020-01-08|2020-01-08| 2.0|
# | 2|2020-01-21|2020-01-08| 2.0|
# | 2|2020-01-31|2020-01-31| 0.2|
# | 3|2020-01-01|2019-12-25| 0.3|
# | 3|2020-01-08|2019-12-25| 0.3|
# | 3|2020-01-21|2020-01-21| 30.0|
# | 3|2020-01-31|2020-01-21| 30.0|
# --- ---------- ---------- -----
CodePudding user response:
It seems, that you can join just on id
, as this key looks well distributed. You could aggregate a bit the df b
, join both dfs, then filter and extract the value with max date.
from pyspark.sql import functions as F
b = b.groupBy('id').agg(F.collect_list(F.array('date', 'value')).alias('dv'))
df = a.join(b, 'id', 'left')
df = df.select(
a['*'],
F.array_max(F.filter('dv', lambda x: x[0] <= F.col('date')))[1].alias('value')
)
df.show()
# --- ---------- -----
# | id| date|value|
# --- ---------- -----
# | 1|2020-01-01| 0.1|
# | 1|2020-01-08| 1.0|
# | 3|2020-01-01| 0.3|
# | 3|2020-01-08| 0.3|
# | 2|2020-01-01| 0.2|
# | 2|2020-01-08| 2.0|
# | 1|2020-01-21| 10.0|
# | 1|2020-01-31| 0.1|
# | 3|2020-01-21| 30.0|
# | 3|2020-01-31| 30.0|
# | 2|2020-01-21| 2.0|
# | 2|2020-01-31| 0.2|
# --- ---------- -----