Home > Blockchain >  Adding a new column to a dataframe with a value which is based on the values from next rows
Adding a new column to a dataframe with a value which is based on the values from next rows

Time:06-17

I have a dataframe as shown below,

 ----- ---------- --------- ------- ------------------- 
|jobid|fieldmname|new_value|coltype|           createat|
 ----- ---------- --------- ------- ------------------- 
|    1|  jobstage|  sttaus1|   null|2022-10-10 12:11:34|
|    1| jobstatus|  sttaus2| status|2022-10-10 13:11:34|
|    1|  jobstage|  sttaus3|   null|2022-10-10 14:11:34|
|    1| jobstatus|  sttaus4|   null|2022-10-10 15:11:34|
|    1| jobstatus| sttaus10| status|2022-10-10 16:11:34|
|    1| jobstatus| sttaus11|   null|2022-10-10 17:11:34|
|    2|  jobstage|  sttaus1|   null|2022-10-11 10:11:34|
|    2| jobstatus|  sttaus2| status|2022-11-11 12:11:34|
 ----- ---------- --------- ------- ------------------- 

  Seq(
(1, "jobstage", "sttaus1", "null", "2022-10-10 12:11:34"),
(1, "jobstatus", "sttaus2", "status", "2022-10-10 13:11:34"),
(1, "jobstage", "sttaus3", "null", "2022-10-10 14:11:34"),
(1, "jobstatus", "sttaus4", "null", "2022-10-10 15:11:34"),
(1, "jobstatus", "sttaus10", "status", "2022-10-10 16:11:34"),
(1, "jobstatus", "sttaus11", null, "2022-10-10 17:11:34"),
(2, "jobstage", "sttaus1", "null", "2022-10-11 10:11:34"),
(2, "jobstatus", "sttaus2", "status", "2022-11-10 12:11:34")
).toDF("jobid", "fieldmname", "new_value", "coltype", "createat")

Need to add new column and add value only for rows where fieldmname is "jobstage". and the value should be latest status (check in next rows) for that corresponding jobstage. while selecting latest need to check for coltype value if it's "status".

Expected dataframe:

 ----- ---------- --------- ------- ------------------- ------------- 
|jobid|fieldmname|new_value|coltype|           createat|latest_status|
 ----- ---------- --------- ------- ------------------- ------------- 
|    1|  jobstage|  sttaus1|   null|2022-10-10 12:11:34|      sttaus2|
|    1| jobstatus|  sttaus2| status|2022-10-10 13:11:34|             |
|    1|  jobstage|  sttaus3|   null|2022-10-10 14:11:34|     sttaus10|
|    1| jobstatus|  sttaus4|   null|2022-10-10 15:11:34|             |
|    1| jobstatus| sttaus10| status|2022-10-10 16:11:34|             |
|    1| jobstatus| sttaus11|   null|2022-10-10 17:11:34|             |
|    2|  jobstage|  sttaus1|   null|2022-10-11 10:11:34|      sttaus2|
|    2| jobstatus|  sttaus2| status|2022-11-11 12:11:34|             |
 ----- ---------- --------- ------- ------------------- ------------- 

I tried with lead, lag, row_number but not getting expected result.

CodePudding user response:

The question is tagged , so I'm writing a way to do the required in pyspark using the lead() window function.

data_sdf. \
    withColumn('latest',
               func.when(func.lead('coltype').over(wd.partitionBy('jobid').orderBy('createat')) == 'status', 
                                                   func.lead('new_value').over(wd.partitionBy('jobid').orderBy('createat'))
                         ).
               otherwise(func.lit(''))
               ). \
    show()

#  ----- ---------- --------- ------- ------------------- -------- 
# |jobid|fieldmname|new_value|coltype|           createat|  latest|
#  ----- ---------- --------- ------- ------------------- -------- 
# |    1|  jobstage|  sttaus1|   null|2022-10-10 12:11:34| sttaus2|
# |    1| jobstatus|  sttaus2| status|2022-10-10 13:11:34|        |
# |    1|  jobstage|  sttaus3|   null|2022-10-10 14:11:34|        |
# |    1| jobstatus|  sttaus4|   null|2022-10-10 15:11:34|sttaus10|
# |    1| jobstatus| sttaus10| status|2022-10-10 16:11:34|        |
# |    1| jobstatus| sttaus11|   null|2022-10-10 17:11:34|        |
# |    2|  jobstage|  sttaus1|   null|2022-10-11 10:11:34| sttaus2|
# |    2| jobstatus|  sttaus2| status|2022-11-10 12:11:34|        |
#  ----- ---------- --------- ------- ------------------- -------- 

So, if the next record's coltype column has the value "status", that record's (lead) new_value column's value will be used.

  • Related