I have a dataframe as shown below,
----- ---------- --------- ------- -------------------
|jobid|fieldmname|new_value|coltype| createat|
----- ---------- --------- ------- -------------------
| 1| jobstage| sttaus1| null|2022-10-10 12:11:34|
| 1| jobstatus| sttaus2| status|2022-10-10 13:11:34|
| 1| jobstage| sttaus3| null|2022-10-10 14:11:34|
| 1| jobstatus| sttaus4| null|2022-10-10 15:11:34|
| 1| jobstatus| sttaus10| status|2022-10-10 16:11:34|
| 1| jobstatus| sttaus11| null|2022-10-10 17:11:34|
| 2| jobstage| sttaus1| null|2022-10-11 10:11:34|
| 2| jobstatus| sttaus2| status|2022-11-11 12:11:34|
----- ---------- --------- ------- -------------------
Seq(
(1, "jobstage", "sttaus1", "null", "2022-10-10 12:11:34"),
(1, "jobstatus", "sttaus2", "status", "2022-10-10 13:11:34"),
(1, "jobstage", "sttaus3", "null", "2022-10-10 14:11:34"),
(1, "jobstatus", "sttaus4", "null", "2022-10-10 15:11:34"),
(1, "jobstatus", "sttaus10", "status", "2022-10-10 16:11:34"),
(1, "jobstatus", "sttaus11", null, "2022-10-10 17:11:34"),
(2, "jobstage", "sttaus1", "null", "2022-10-11 10:11:34"),
(2, "jobstatus", "sttaus2", "status", "2022-11-10 12:11:34")
).toDF("jobid", "fieldmname", "new_value", "coltype", "createat")
Need to add new column and add value only for rows where fieldmname is "jobstage". and the value should be latest status (check in next rows) for that corresponding jobstage. while selecting latest need to check for coltype value if it's "status".
Expected dataframe:
----- ---------- --------- ------- ------------------- -------------
|jobid|fieldmname|new_value|coltype| createat|latest_status|
----- ---------- --------- ------- ------------------- -------------
| 1| jobstage| sttaus1| null|2022-10-10 12:11:34| sttaus2|
| 1| jobstatus| sttaus2| status|2022-10-10 13:11:34| |
| 1| jobstage| sttaus3| null|2022-10-10 14:11:34| sttaus10|
| 1| jobstatus| sttaus4| null|2022-10-10 15:11:34| |
| 1| jobstatus| sttaus10| status|2022-10-10 16:11:34| |
| 1| jobstatus| sttaus11| null|2022-10-10 17:11:34| |
| 2| jobstage| sttaus1| null|2022-10-11 10:11:34| sttaus2|
| 2| jobstatus| sttaus2| status|2022-11-11 12:11:34| |
----- ---------- --------- ------- ------------------- -------------
I tried with lead, lag, row_number but not getting expected result.
CodePudding user response:
The question is tagged pyspark, so I'm writing a way to do the required in pyspark using the lead()
window function.
data_sdf. \
withColumn('latest',
func.when(func.lead('coltype').over(wd.partitionBy('jobid').orderBy('createat')) == 'status',
func.lead('new_value').over(wd.partitionBy('jobid').orderBy('createat'))
).
otherwise(func.lit(''))
). \
show()
# ----- ---------- --------- ------- ------------------- --------
# |jobid|fieldmname|new_value|coltype| createat| latest|
# ----- ---------- --------- ------- ------------------- --------
# | 1| jobstage| sttaus1| null|2022-10-10 12:11:34| sttaus2|
# | 1| jobstatus| sttaus2| status|2022-10-10 13:11:34| |
# | 1| jobstage| sttaus3| null|2022-10-10 14:11:34| |
# | 1| jobstatus| sttaus4| null|2022-10-10 15:11:34|sttaus10|
# | 1| jobstatus| sttaus10| status|2022-10-10 16:11:34| |
# | 1| jobstatus| sttaus11| null|2022-10-10 17:11:34| |
# | 2| jobstage| sttaus1| null|2022-10-11 10:11:34| sttaus2|
# | 2| jobstatus| sttaus2| status|2022-11-10 12:11:34| |
# ----- ---------- --------- ------- ------------------- --------
So, if the next record's coltype
column has the value "status", that record's (lead) new_value
column's value will be used.