Home > other >  Look back based on X days an get col values based on condition spark
Look back based on X days an get col values based on condition spark

Time:08-13

I have the following DF:

Id |Date               |Value   |cond  |
---------------------------------------|
1  |2022-08-03 00:00:00|     "A"| 1    |
1  |2022-08-04 00:00:00|     "B"| 2    |
1  |2022-08-05 00:00:00|     "C"| 1    |
1  |2022-08-06 00:00:00|     "D"| 1    |
1  |2022-08-07 00:00:00|     "E"| 1    |
1  |2022-08-08 00:00:00|     "F"| 1    |
2  |2022-08-03 00:00:00|     "G"| 1    |
2  |2022-08-04 00:00:00|     "H"| 2    |
2  |2022-08-05 00:00:00|     "I"| 1    |
2  |2022-08-06 00:00:00|     "J"| 1    |
2  |2022-08-07 00:00:00|     "K"| 1    |
2  |2022-08-08 00:00:00|     "L"| 1    |
----------------------------------------

And this one:

----------------------------|
|Date               | cond  |
----------------------------|
|2022-08-03 00:00:00| 1     |
|2022-08-04 00:00:00| 2     |
|2022-08-05 00:00:00| 1     |
|2022-08-06 00:00:00| 1     |
|2022-08-07 00:00:00| 1     |
|2022-08-08 00:00:00| 1     |
-----------------------------

Based on this two DFs I need to generate another one Looking always four days based on Date, Cond and Id (Date, Cond, Id needs to be the same in both DFs).

Using pure SQL I use outter apply (using loops) to generate the final "DF". But in spark outter apply doesn't exists. I tried to create a solution using Window Function, but with no success. My expected output is:

------------------------------------------------------
Id |Data               |Count| List View      | agg  |
-----------------------------------------------------|
1  |2022-08-03 00:00:00|0|null                |0     |
1  |2022-08-04 00:00:00|1|["A"]               |0     |
1  |2022-08-05 00:00:00|2|["A", "B"]          |0     |
1  |2022-08-06 00:00:00|3|["A", "B", "C"]     |0     |
1  |2022-08-07 00:00:00|4|["A", "B", "C", "D"]|"ABCD"|
1  |2022-08-08 00:00:00|4|["B", "C", "D", "E"]|"BCDE"|
2  |2022-08-03 00:00:00|0|null                |0     |
2  |2022-08-04 00:00:00|1|["G"]               |0     |
2  |2022-08-05 00:00:00|2|["G", "H"]          |0     |
2  |2022-08-06 00:00:00|3|["G", "H", "I"]     |0     |
2  |2022-08-07 00:00:00|4|["G", "I", "J", "K"]|"GIJK"|
2  |2022-08-08 00:00:00|4|["I", "J", "K", "L"]|"IJKL"|
------------------------------------------------------

The list view column it's not necessary, I just put it there because I think its easier to generate agg column based on list view column (I couldn't think in way to generate agg column without an 'intermediate' column as list view column)

My doubts are:

  1. How to generate the output DF.
  2. What is the best the way the generate the output DF.

MVCE to generate the input DFs in pyspark:

schema_1 = StructType([
    StructField("Id", StringType(),True),
    StructField("Date", DateType(),True),
    StructField("Value", StringType(),True),
    StructField("Cond", IntegerType(),True)
  ])

df_1 = spark.createDataFrame(data=data_1,schema=schema_1)


 data_2 = [
 ("2022-08-03 00:00:00",  1),
 ("2022-08-04 00:00:00",  2),
 ("2022-08-05 00:00:00",  1),
 ("2022-08-06 00:00:00",  1),
 ("2022-08-07 00:00:00",  1),
 ("2022-08-08 00:00:00",  1)
 ]

schema_2 = StructType([
    StructField("Date", DateType(),True),
    StructField("Cond", IntegerType(),True)
  ])

df_2 = spark.createDataFrame(data=data_2,schema=schema_2)

CodePudding user response:

@OdiumPura

can I use only df1 to get the desired output ?

if yes , I have the solution ready to do in Pyspark which gives the exaxt result what you have shared.

Thanks

CodePudding user response:

I'm not sure why you need df_2 but here is a solution that gets you the expected output using pyspark.sql.window.

from pyspark.sql import SparkSession, functions as F, Window

windowSpec = Window.partitionBy("Id")
df_1 = df_1.withColumn("List View", F.collect_list("Value").over(windowSpec.rowsBetween(-4, -1))) \
        .withColumn("Count", F.size("List View")) \
        .withColumn("agg", F.when(F.col("Count") < 4, F.lit(0)).otherwise(F.array_join("List View", ""))) \
        .drop("Value", "Cond")

df_1.show()

Output is:

 --- ------------------- ------------ ----- ---- 
| Id|               Date|   List View|Count| agg|
 --- ------------------- ------------ ----- ---- 
|  1|2022-08-03 00:00:00|          []|    0|   0|
|  1|2022-08-04 00:00:00|         [A]|    1|   0|
|  1|2022-08-05 00:00:00|      [A, B]|    2|   0|
|  1|2022-08-06 00:00:00|   [A, B, C]|    3|   0|
|  1|2022-08-07 00:00:00|[A, B, C, D]|    4|ABCD|
|  1|2022-08-08 00:00:00|[B, C, D, E]|    4|BCDE|
|  2|2022-08-03 00:00:00|          []|    0|   0|
|  2|2022-08-04 00:00:00|         [G]|    1|   0|
|  2|2022-08-05 00:00:00|      [G, H]|    2|   0|
|  2|2022-08-06 00:00:00|   [G, H, I]|    3|   0|
|  2|2022-08-07 00:00:00|[G, H, I, J]|    4|GHIJ|
|  2|2022-08-08 00:00:00|[H, I, J, K]|    4|HIJK|
 --- ------------------- ------------ ----- ---- 
  • Related