Consedering that I have the following DF:
|-----------------|
|Date | Cod |
|-----------------|
|2022-08-01 | A |
|2022-08-02 | A |
|2022-08-03 | A |
|2022-08-04 | A |
|2022-08-05 | A |
|2022-08-01 | B |
|2022-08-02 | B |
|2022-08-03 | B |
|2022-08-04 | B |
|2022-08-05 | B |
|-----------------|
And considering that I have a backward observation of 2 days, how can I generate the following output DF
|------------------------------|
|RefDate | Date | Cod
|------------------------------|
|2022-08-03 | 2022-08-01 | A |
|2022-08-03 | 2022-08-02 | A |
|2022-08-03 | 2022-08-03 | A |
|2022-08-04 | 2022-08-02 | A |
|2022-08-04 | 2022-08-03 | A |
|2022-08-04 | 2022-08-04 | A |
|2022-08-05 | 2022-08-03 | A |
|2022-08-05 | 2022-08-04 | A |
|2022-08-05 | 2022-08-05 | A |
|2022-08-03 | 2022-08-01 | B |
|2022-08-03 | 2022-08-02 | B |
|2022-08-03 | 2022-08-03 | B |
|2022-08-04 | 2022-08-02 | B |
|2022-08-04 | 2022-08-03 | B |
|2022-08-04 | 2022-08-04 | B |
|2022-08-05 | 2022-08-03 | B |
|2022-08-05 | 2022-08-04 | B |
|2022-08-05 | 2022-08-05 | B |
|------------------------------|
I know that I can use loops to generate this output DF, but loops doesn't have a good performance since I can't cache the DF on memory (My original DF has approx 6 billion lines). So, what is the best way to get this output?
MVCE:
data_1=[
("2022-08-01","A"),
("2022-08-02","A"),
("2022-08-03","A"),
("2022-08-04","A"),
("2022-08-05","A"),
("2022-08-01","B"),
("2022-08-02","B"),
("2022-08-03","B"),
("2022-08-04","B"),
("2022-08-05","B")
]
schema_1 = StructType([
StructField("Date", StringType(),True),
StructField("Cod", StringType(),True)
])
df_1 = spark.createDataFrame(data=data_1,schema=schema_1)
CodePudding user response:
You could try a self join. My thoughts - If your cluster and session are configured optimally, it should work with 6B rows.
data_sdf.alias('a'). \
join(data_sdf.alias('b'),
[func.col('a.cod') == func.col('b.cod'),
func.datediff(func.col('a.date'), func.col('b.date')).between(0, 2)],
'inner'
). \
drop(func.col('a.cod')). \
selectExpr('cod', 'a.date as ref_date', 'b.date as date'). \
show()
# --- ---------- ----------
# |cod| ref_date| date|
# --- ---------- ----------
# | B|2022-08-01|2022-08-01|
# | B|2022-08-02|2022-08-01|
# | B|2022-08-02|2022-08-02|
# | B|2022-08-03|2022-08-01|
# | B|2022-08-03|2022-08-02|
# | B|2022-08-03|2022-08-03|
# | B|2022-08-04|2022-08-02|
# | B|2022-08-04|2022-08-03|
# | B|2022-08-04|2022-08-04|
# | B|2022-08-05|2022-08-03|
# | B|2022-08-05|2022-08-04|
# | B|2022-08-05|2022-08-05|
# | A|2022-08-01|2022-08-01|
# | A|2022-08-02|2022-08-01|
# | A|2022-08-02|2022-08-02|
# | A|2022-08-03|2022-08-01|
# | A|2022-08-03|2022-08-02|
# | A|2022-08-03|2022-08-03|
# | A|2022-08-04|2022-08-02|
# | A|2022-08-04|2022-08-03|
# --- ---------- ----------
# only showing top 20 rows
This will generate records for the initial 2 dates as well which can be discarded.