Home > Blockchain >  How to Find the consecutive values in PySpark DataFrame column and replace the value
How to Find the consecutive values in PySpark DataFrame column and replace the value

Time:03-02

I have a below Dataframe

ID    Name     Dept
1     John     ABC
2     Rio      BCD
3     Marry    BCD
4     Andy     BCD
5     Smith    PQR
6     Rich     XYZ
7     Lisa     LMN
8     Steve    LMN
9     Ali      STU

We can see that in Dept column BCD is repeated 3 times and LMN is repeated 2 times.

Now I need create the new column Dept_Updated and check for the consecutive values, if there are consecutive values just add underscore at the last and add the number after underscore, if it is not consecutive value leave as it is.

I need the output in the below format.

ID    Name     Dept   Dept_Updated
1     John     ABC        ABC
2     Rio      BCD        BCD_1
3     Marry    BCD        BCD_2
4     Andy     BCD        BCD_3
5     Smith    PQR        PQR
6     Rich     XYZ        XYZ
7     Lisa     LMN        LMN_1
8     Steve    LMN        LMN_2
9     Ali      STU        STU

I am very new to PySpark, is there any way to achieve the above output that it would be really helpful.

CodePudding user response:

We need a window partitioned by Dept and also need to check for consecutive entries only. For that I propose something like below which will check if the next row is same as the current row and only append the Rnk column (the count column) for entries which has duplicates:

from pyspark.sql import functions as F, Window as W
w = W.orderBy('ID')
w1 = W.partitionBy("Dept").orderBy("Dept")
condition = F.col("Check_Duplicate")| ((F.col("CheckLength")>1) & (F.col("Rnk")==1))

new_df = df.withColumn("Check_Duplicate",F.col("Dept")==F.lag("Dept").over(w))\
.withColumn("Rnk",F.row_number().over(w1))\
.withColumn("CheckLength",F.count("Dept").over(w1))\
.withColumn("Dept_Updated",F.when(condition,F.concat_ws("_",*["Dept","Rnk"]))
                                                   .otherwise(F.col("Dept")))

new_df.select(*df.columns,'Dept_Updated').orderBy("ID").show()

Output:

 --- ----- ---- ------------ 
| ID| Name|Dept|Dept_Updated|
 --- ----- ---- ------------ 
|  1| John| ABC|         ABC|
|  2|  Rio| BCD|       BCD_1|
|  3|Marry| BCD|       BCD_2|
|  4| Andy| BCD|       BCD_3|
|  5|Smith| PQR|         PQR|
|  6| Rich| XYZ|         XYZ|
|  7| Lisa| LMN|       LMN_1|
|  8|Steve| LMN|       LMN_2|
|  9|  Ali| STU|         STU|
 --- ----- ---- ------------ 

Test to show that incase the Dept is not repeated consecutively, the code doesnot append the row number:

 --- ----- ---- ------------ 
| ID| Name|Dept|Dept_Updated|
 --- ----- ---- ------------ 
|  1| John| ABC|         ABC|
|  2|  Rio| BCD|       BCD_1|
|  3|Marry| BCD|       BCD_2|
|  4| Andy| BCD|       BCD_3|
|  5|Smith| PQR|         PQR|
|  6| Rich| BCD|         BCD| # <-- This entry is repeated but not consecutive
|  7| Lisa| LMN|       LMN_1|
|  8|Steve| LMN|       LMN_2|
|  9|  Ali| STU|         STU|

CodePudding user response:

You can use WindowSpec function to create a row number for each record/row in that partition (here we are partitioning on Dept Column) and then use it to append with Dept Column in the new Dept_Updated Column.

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import concat,col

windowSpec  = Window.partitionBy("Dept").orderBy("Dept")

df.withColumn("row_number",row_number().over(windowSpec))
.withColumn("Dept_Updated",concat(col("Dept"), lit("_"), col("row_number")))
.drop("row_number")
.show()
  • Related