Home > Software design >  Create a dataframe base on X days backward observation
Create a dataframe base on X days backward observation

Time:08-24

Consedering that I have the following DF:

|-----------------|
|Date       | Cod |
|-----------------|
|2022-08-01 |  A  | 
|2022-08-02 |  A  |
|2022-08-03 |  A  |
|2022-08-04 |  A  |
|2022-08-05 |  A  |
|2022-08-01 |  B  |
|2022-08-02 |  B  |
|2022-08-03 |  B  |
|2022-08-04 |  B  |
|2022-08-05 |  B  |
|-----------------|

And considering that I have a backward observation of 2 days, how can I generate the following output DF

|------------------------------|
|RefDate    | Date       | Cod
|------------------------------|
|2022-08-03 |  2022-08-01 |  A |
|2022-08-03 |  2022-08-02 |  A |
|2022-08-03 |  2022-08-03 |  A |
|2022-08-04 |  2022-08-02 |  A |
|2022-08-04 |  2022-08-03 |  A |
|2022-08-04 |  2022-08-04 |  A |
|2022-08-05 |  2022-08-03 |  A |
|2022-08-05 |  2022-08-04 |  A |
|2022-08-05 |  2022-08-05 |  A |
|2022-08-03 |  2022-08-01 |  B |
|2022-08-03 |  2022-08-02 |  B |
|2022-08-03 |  2022-08-03 |  B |
|2022-08-04 |  2022-08-02 |  B |
|2022-08-04 |  2022-08-03 |  B |
|2022-08-04 |  2022-08-04 |  B |
|2022-08-05 |  2022-08-03 |  B |
|2022-08-05 |  2022-08-04 |  B |
|2022-08-05 |  2022-08-05 |  B |
|------------------------------|

I know that I can use loops to generate this output DF, but loops doesn't have a good performance since I can't cache the DF on memory (My original DF has approx 6 billion lines). So, what is the best way to get this output?

MVCE:

data_1=[
    ("2022-08-01","A"),
    ("2022-08-02","A"),
    ("2022-08-03","A"),
    ("2022-08-04","A"),
    ("2022-08-05","A"),
    ("2022-08-01","B"),
    ("2022-08-02","B"),
    ("2022-08-03","B"),
    ("2022-08-04","B"),
    ("2022-08-05","B")
]

schema_1 = StructType([
    StructField("Date", StringType(),True),
    StructField("Cod", StringType(),True)
  ])

df_1 = spark.createDataFrame(data=data_1,schema=schema_1)

CodePudding user response:

You could try a self join. My thoughts - If your cluster and session are configured optimally, it should work with 6B rows.

data_sdf.alias('a'). \
    join(data_sdf.alias('b'), 
         [func.col('a.cod') == func.col('b.cod'), 
          func.datediff(func.col('a.date'), func.col('b.date')).between(0, 2)],
         'inner'
         ). \
    drop(func.col('a.cod')). \
    selectExpr('cod', 'a.date as ref_date', 'b.date as date'). \
    show()

#  --- ---------- ---------- 
# |cod|  ref_date|      date|
#  --- ---------- ---------- 
# |  B|2022-08-01|2022-08-01|
# |  B|2022-08-02|2022-08-01|
# |  B|2022-08-02|2022-08-02|
# |  B|2022-08-03|2022-08-01|
# |  B|2022-08-03|2022-08-02|
# |  B|2022-08-03|2022-08-03|
# |  B|2022-08-04|2022-08-02|
# |  B|2022-08-04|2022-08-03|
# |  B|2022-08-04|2022-08-04|
# |  B|2022-08-05|2022-08-03|
# |  B|2022-08-05|2022-08-04|
# |  B|2022-08-05|2022-08-05|
# |  A|2022-08-01|2022-08-01|
# |  A|2022-08-02|2022-08-01|
# |  A|2022-08-02|2022-08-02|
# |  A|2022-08-03|2022-08-01|
# |  A|2022-08-03|2022-08-02|
# |  A|2022-08-03|2022-08-03|
# |  A|2022-08-04|2022-08-02|
# |  A|2022-08-04|2022-08-03|
#  --- ---------- ---------- 
# only showing top 20 rows

This will generate records for the initial 2 dates as well which can be discarded.

  • Related