Home > Software engineering >  Add column to Spark dataframe with the max value that is less than the current record's value
Add column to Spark dataframe with the max value that is less than the current record's value

Time:10-08

I have a Spark dataframe similar to the following:

id  claim_id                 service_date                  status   product
123 10606134411906233408    2018-09-17T00:00:00.000 0000    PD      blue
123 10606147900401009928    2019-01-24T00:00:00.000 0000    PD      yellow
123 10606160940704723994    2019-05-23T00:00:00.000 0000    RV      yellow
123 10606171648203079553    2019-08-29T00:00:00.000 0000    RJ      blue
123 10606186611407311724    2020-01-13T00:00:00.000 0000    PD      blue

Forgive me for not pasting any code as nothing has worked. I want to add a new column with the max(service_date) of a previous row where the status is PD and the product of the current row = the product of the previous row.

This is easily done with a correlated subquery but is not efficient and, besides, is not doable in Spark because non-equi joins are not supported. Also note that LAG will not work because I do not always require the immediate preceding record (and offset would be dynamic).

The expected output would be something such as this:

id  claim_id                 service_date                  status   product     previous_service_date
    123 10606134411906233408    2018-09-17T00:00:00.000 0000    PD      blue
    123 10606147900401009928    2019-01-24T00:00:00.000 0000    PD      yellow
    123 10606160940704723994    2019-05-23T00:00:00.000 0000    RV      yellow      2019-01-24T00:00:00.000 0000
    123 10606171648203079553    2019-08-29T00:00:00.000 0000    RJ      blue        2018-09-17T00:00:00.000 0000
    123 10606186611407311724    2020-01-13T00:00:00.000 0000    PD      blue        2018-09-17T00:00:00.000 0000

CodePudding user response:

You may try the following which uses max as a window function with when (a case expression) but focuses on the preceding rows

from pyspark.sql import functions as F
from pyspark.sql import Window


df = df.withColumn('previous_service_date',F.max(
    F.when(F.col("status")=="PD",F.col("service_date")).otherwise(None)
).over(
    Window.partitionBy("product")
          .rowsBetween(Window.unboundedPreceding,-1)
))

df.orderBy('service_date').show(truncate=False)
 --- -------------------- ------------------- ------ ------- --------------------- 
|id |claim_id            |service_date       |status|product|previous_service_date|
 --- -------------------- ------------------- ------ ------- --------------------- 
|123|10606134411906233408|2018-09-17 00:00:00|PD    |blue   |null                 |
|123|10606147900401009928|2019-01-24 00:00:00|PD    |yellow |null                 |
|123|10606160940704723994|2019-05-23 00:00:00|RV    |yellow |2019-01-24 00:00:00  |
|123|10606171648203079553|2019-08-29 00:00:00|RJ    |blue   |2018-09-17 00:00:00  |
|123|10606186611407311724|2020-01-13 00:00:00|PD    |blue   |2018-09-17 00:00:00  |
 --- -------------------- ------------------- ------ ------- --------------------- 

Edit 1

You may also use last as denoted below

df = df.withColumn('previous_service_date',F.last(
    F.when(F.col("status")=="PD" ,F.col("service_date")).otherwise(None),True
).over(
    Window.partitionBy("product")
          .orderBy('service_date')
          .rowsBetween(Window.unboundedPreceding,-1)
))

Let me know if this works for you.

CodePudding user response:

You can copy your DataFrame to a new DataFrame (df2) and join both as below:

(df.join(df2, 
         on = [df.Service_date > df2.Service_date,
               df.product == df2.product,
               df2.status == 'PD'],
         how = "left"))

The drop the duplicated columns and rename the df2.Service_date to previous_service_date

  • Related