I have the following DF:
Id |Date |Value |cond |
---------------------------------------|
1 |2022-08-03 00:00:00| "A"| 1 |
1 |2022-08-04 00:00:00| "B"| 2 |
1 |2022-08-05 00:00:00| "C"| 1 |
1 |2022-08-06 00:00:00| "D"| 1 |
1 |2022-08-07 00:00:00| "E"| 1 |
1 |2022-08-08 00:00:00| "F"| 1 |
2 |2022-08-03 00:00:00| "G"| 1 |
2 |2022-08-04 00:00:00| "H"| 2 |
2 |2022-08-05 00:00:00| "I"| 1 |
2 |2022-08-06 00:00:00| "J"| 1 |
2 |2022-08-07 00:00:00| "K"| 1 |
2 |2022-08-08 00:00:00| "L"| 1 |
----------------------------------------
And this one:
----------------------------|
|Date | cond |
----------------------------|
|2022-08-03 00:00:00| 1 |
|2022-08-04 00:00:00| 2 |
|2022-08-05 00:00:00| 1 |
|2022-08-06 00:00:00| 1 |
|2022-08-07 00:00:00| 1 |
|2022-08-08 00:00:00| 1 |
-----------------------------
Based on this two DFs I need to generate another one Looking always four days based on Date
, Cond
and Id
(Date
, Cond
, Id
needs to be the same in both DFs).
Using pure SQL I use outter apply (using loops) to generate the final "DF". But in spark outter apply doesn't exists. I tried to create a solution using Window Function, but with no success. My expected output is:
------------------------------------------------------
Id |Data |Count| List View | agg |
-----------------------------------------------------|
1 |2022-08-03 00:00:00|0|null |0 |
1 |2022-08-04 00:00:00|1|["A"] |0 |
1 |2022-08-05 00:00:00|2|["A", "B"] |0 |
1 |2022-08-06 00:00:00|3|["A", "B", "C"] |0 |
1 |2022-08-07 00:00:00|4|["A", "B", "C", "D"]|"ABCD"|
1 |2022-08-08 00:00:00|4|["B", "C", "D", "E"]|"BCDE"|
2 |2022-08-03 00:00:00|0|null |0 |
2 |2022-08-04 00:00:00|1|["G"] |0 |
2 |2022-08-05 00:00:00|2|["G", "H"] |0 |
2 |2022-08-06 00:00:00|3|["G", "H", "I"] |0 |
2 |2022-08-07 00:00:00|4|["G", "I", "J", "K"]|"GIJK"|
2 |2022-08-08 00:00:00|4|["I", "J", "K", "L"]|"IJKL"|
------------------------------------------------------
The list view column it's not necessary, I just put it there because I think its easier to generate agg
column based on list view
column (I couldn't think in way to generate agg
column without an 'intermediate' column as list view
column)
My doubts are:
- How to generate the output DF.
- What is the best the way the generate the output DF.
MVCE to generate the input DFs in pyspark:
schema_1 = StructType([
StructField("Id", StringType(),True),
StructField("Date", DateType(),True),
StructField("Value", StringType(),True),
StructField("Cond", IntegerType(),True)
])
df_1 = spark.createDataFrame(data=data_1,schema=schema_1)
data_2 = [
("2022-08-03 00:00:00", 1),
("2022-08-04 00:00:00", 2),
("2022-08-05 00:00:00", 1),
("2022-08-06 00:00:00", 1),
("2022-08-07 00:00:00", 1),
("2022-08-08 00:00:00", 1)
]
schema_2 = StructType([
StructField("Date", DateType(),True),
StructField("Cond", IntegerType(),True)
])
df_2 = spark.createDataFrame(data=data_2,schema=schema_2)
CodePudding user response:
@OdiumPura
can I use only df1 to get the desired output ?
if yes , I have the solution ready to do in Pyspark which gives the exaxt result what you have shared.
Thanks
CodePudding user response:
I'm not sure why you need df_2
but here is a solution that gets you the expected output using pyspark.sql.window
.
from pyspark.sql import SparkSession, functions as F, Window
windowSpec = Window.partitionBy("Id")
df_1 = df_1.withColumn("List View", F.collect_list("Value").over(windowSpec.rowsBetween(-4, -1))) \
.withColumn("Count", F.size("List View")) \
.withColumn("agg", F.when(F.col("Count") < 4, F.lit(0)).otherwise(F.array_join("List View", ""))) \
.drop("Value", "Cond")
df_1.show()
Output is:
--- ------------------- ------------ ----- ----
| Id| Date| List View|Count| agg|
--- ------------------- ------------ ----- ----
| 1|2022-08-03 00:00:00| []| 0| 0|
| 1|2022-08-04 00:00:00| [A]| 1| 0|
| 1|2022-08-05 00:00:00| [A, B]| 2| 0|
| 1|2022-08-06 00:00:00| [A, B, C]| 3| 0|
| 1|2022-08-07 00:00:00|[A, B, C, D]| 4|ABCD|
| 1|2022-08-08 00:00:00|[B, C, D, E]| 4|BCDE|
| 2|2022-08-03 00:00:00| []| 0| 0|
| 2|2022-08-04 00:00:00| [G]| 1| 0|
| 2|2022-08-05 00:00:00| [G, H]| 2| 0|
| 2|2022-08-06 00:00:00| [G, H, I]| 3| 0|
| 2|2022-08-07 00:00:00|[G, H, I, J]| 4|GHIJ|
| 2|2022-08-08 00:00:00|[H, I, J, K]| 4|HIJK|
--- ------------------- ------------ ----- ----