I have a Spark dataframe similar to the following:
id claim_id service_date status product
123 10606134411906233408 2018-09-17T00:00:00.000 0000 PD blue
123 10606147900401009928 2019-01-24T00:00:00.000 0000 PD yellow
123 10606160940704723994 2019-05-23T00:00:00.000 0000 RV yellow
123 10606171648203079553 2019-08-29T00:00:00.000 0000 RJ blue
123 10606186611407311724 2020-01-13T00:00:00.000 0000 PD blue
Forgive me for not pasting any code as nothing has worked. I want to add a new column with the max(service_date) of a previous row where the status is PD and the product of the current row = the product of the previous row.
This is easily done with a correlated subquery but is not efficient and, besides, is not doable in Spark because non-equi joins are not supported. Also note that LAG will not work because I do not always require the immediate preceding record (and offset would be dynamic).
The expected output would be something such as this:
id claim_id service_date status product previous_service_date
123 10606134411906233408 2018-09-17T00:00:00.000 0000 PD blue
123 10606147900401009928 2019-01-24T00:00:00.000 0000 PD yellow
123 10606160940704723994 2019-05-23T00:00:00.000 0000 RV yellow 2019-01-24T00:00:00.000 0000
123 10606171648203079553 2019-08-29T00:00:00.000 0000 RJ blue 2018-09-17T00:00:00.000 0000
123 10606186611407311724 2020-01-13T00:00:00.000 0000 PD blue 2018-09-17T00:00:00.000 0000
CodePudding user response:
You may try the following which uses max
as a window function with when
(a case expression) but focuses on the preceding rows
from pyspark.sql import functions as F
from pyspark.sql import Window
df = df.withColumn('previous_service_date',F.max(
F.when(F.col("status")=="PD",F.col("service_date")).otherwise(None)
).over(
Window.partitionBy("product")
.rowsBetween(Window.unboundedPreceding,-1)
))
df.orderBy('service_date').show(truncate=False)
--- -------------------- ------------------- ------ ------- ---------------------
|id |claim_id |service_date |status|product|previous_service_date|
--- -------------------- ------------------- ------ ------- ---------------------
|123|10606134411906233408|2018-09-17 00:00:00|PD |blue |null |
|123|10606147900401009928|2019-01-24 00:00:00|PD |yellow |null |
|123|10606160940704723994|2019-05-23 00:00:00|RV |yellow |2019-01-24 00:00:00 |
|123|10606171648203079553|2019-08-29 00:00:00|RJ |blue |2018-09-17 00:00:00 |
|123|10606186611407311724|2020-01-13 00:00:00|PD |blue |2018-09-17 00:00:00 |
--- -------------------- ------------------- ------ ------- ---------------------
Edit 1
You may also use last
as denoted below
df = df.withColumn('previous_service_date',F.last(
F.when(F.col("status")=="PD" ,F.col("service_date")).otherwise(None),True
).over(
Window.partitionBy("product")
.orderBy('service_date')
.rowsBetween(Window.unboundedPreceding,-1)
))
Let me know if this works for you.
CodePudding user response:
You can copy
your DataFrame to a new DataFrame (df2
) and join
both as below:
(df.join(df2,
on = [df.Service_date > df2.Service_date,
df.product == df2.product,
df2.status == 'PD'],
how = "left"))
The drop the duplicated columns and rename the df2.Service_date
to previous_service_date