I have a below Dataframe
ID Name Dept
1 John ABC
2 Rio BCD
3 Marry BCD
4 Andy BCD
5 Smith PQR
6 Rich XYZ
7 Lisa LMN
8 Steve LMN
9 Ali STU
We can see that in Dept column BCD is repeated 3 times and LMN is repeated 2 times.
Now I need create the new column Dept_Updated and check for the consecutive values, if there are consecutive values just add underscore at the last and add the number after underscore, if it is not consecutive value leave as it is.
I need the output in the below format.
ID Name Dept Dept_Updated
1 John ABC ABC
2 Rio BCD BCD_1
3 Marry BCD BCD_2
4 Andy BCD BCD_3
5 Smith PQR PQR
6 Rich XYZ XYZ
7 Lisa LMN LMN_1
8 Steve LMN LMN_2
9 Ali STU STU
I am very new to PySpark, is there any way to achieve the above output that it would be really helpful.
CodePudding user response:
We need a window partitioned by Dept and also need to check for consecutive entries only. For that I propose something like below which will check if the next row is same as the current row and only append the Rnk column (the count column) for entries which has duplicates:
from pyspark.sql import functions as F, Window as W
w = W.orderBy('ID')
w1 = W.partitionBy("Dept").orderBy("Dept")
condition = F.col("Check_Duplicate")| ((F.col("CheckLength")>1) & (F.col("Rnk")==1))
new_df = df.withColumn("Check_Duplicate",F.col("Dept")==F.lag("Dept").over(w))\
.withColumn("Rnk",F.row_number().over(w1))\
.withColumn("CheckLength",F.count("Dept").over(w1))\
.withColumn("Dept_Updated",F.when(condition,F.concat_ws("_",*["Dept","Rnk"]))
.otherwise(F.col("Dept")))
new_df.select(*df.columns,'Dept_Updated').orderBy("ID").show()
Output:
--- ----- ---- ------------
| ID| Name|Dept|Dept_Updated|
--- ----- ---- ------------
| 1| John| ABC| ABC|
| 2| Rio| BCD| BCD_1|
| 3|Marry| BCD| BCD_2|
| 4| Andy| BCD| BCD_3|
| 5|Smith| PQR| PQR|
| 6| Rich| XYZ| XYZ|
| 7| Lisa| LMN| LMN_1|
| 8|Steve| LMN| LMN_2|
| 9| Ali| STU| STU|
--- ----- ---- ------------
Test to show that incase the Dept is not repeated consecutively, the code doesnot append the row number:
--- ----- ---- ------------
| ID| Name|Dept|Dept_Updated|
--- ----- ---- ------------
| 1| John| ABC| ABC|
| 2| Rio| BCD| BCD_1|
| 3|Marry| BCD| BCD_2|
| 4| Andy| BCD| BCD_3|
| 5|Smith| PQR| PQR|
| 6| Rich| BCD| BCD| # <-- This entry is repeated but not consecutive
| 7| Lisa| LMN| LMN_1|
| 8|Steve| LMN| LMN_2|
| 9| Ali| STU| STU|
CodePudding user response:
You can use WindowSpec function to create a row number for each record/row in that partition (here we are partitioning on Dept Column) and then use it to append with Dept Column in the new Dept_Updated Column.
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import concat,col
windowSpec = Window.partitionBy("Dept").orderBy("Dept")
df.withColumn("row_number",row_number().over(windowSpec))
.withColumn("Dept_Updated",concat(col("Dept"), lit("_"), col("row_number")))
.drop("row_number")
.show()