i have a dataframe df as shown below:
VehNum Control_circuit control_circuit_status partnumbers errors Flag
4234456 DOC ok A567UR Software Issue 0
4234456 DOC not_okay A568UR Software Issue 1
4234456 DOC not_okay A569UR Hardware issue 2
4234457 ACR ok A234TY Hardware issue 0
4234457 ACR ok A235TY Hardware issue 0
4234457 ACR ok A234TY Hardware issue 0
4234487 QWR ok A276TY Hardware issue 0
4234487 QWR not_okay A872UR Hardware issue 1
3423448 QWR not_okay A872UR Hardware issue 1
i want to add a new column called "Control_Flag" and perform below operations: for each VehNum ,Control_circuit if it has "control_circuit_status" has status "ok" in that Control_circuit then "Control_Flag" value will be 0 else 1
result should be as below:
VehNum Control_circuit control_circuit_status partnumbers errors Flag Control_Flag
4234456 DOC ok A567UR Software Issue 0 0
4234456 DOC not_okay A568UR Software Issue 1 0
4234456 DOC not_okay A569UR Hardware issue 2 0
4234457 ACR ok A234TY Hardware issue 0 0
4234457 ACR ok A235TY Hardware issue 0 0
4234457 ACR ok A234TY Hardware issue 0 0
4234487 QWR ok A276TY Hardware issue 0 1
4234487 QWR not_okay A872UR Hardware issue 1 1
3423448 QWR not_okay A872UR Hardware issue 1 1
how to achieve this using pyspark
CodePudding user response:
here's the solution
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import Window
df = spark.createDataFrame(
[
("4234456", "DOC", "ok", "A567UR", "Software Issue", 0),
("4234456", "DOC", "not_okay", "A568UR", "Software Issue", 1),
("4234456", "DOC", "not_okay", "A569UR", "Hardware Issue", 2),
("4234457", "ACR", "ok", "A234TY", "Hardware Issue", 0),
("4234457", "ACR", "ok", "A234TY", "Hardware Issue", 0),
("4234457", "ACR", "ok", "A234TY", "Hardware Issue", 0),
("4234487", "QWR", "ok", "A276TY", "Hardware Issue", 0),
("4234487", "QWR", "not_okay", "A872UR", "Hardware Issue", 1),
("3423448", "QWR", "not_okay", "A872UR", "Hardware Issue", 1),
],
["VehNum", "Control_circuit", "control_circuit_status", "partnumbers", "errors", "Flag"],
)
df_agg_window = Window.partitionBy(
"VehNum",
"Control_circuit",
)
df = (
df
.withColumn(
"cc_status",
F.when(
F.lower(F.col("control_circuit_status")) == "ok",
F.lit(1),
)
.when(
F.lower(F.col("control_circuit_status")) == "not_okay",
F.lit(0),
)
.otherwise(F.lit(0)),
)
.withColumn(
"flag_sum",
F.sum("cc_status").over(df_agg_window),
)
.withColumn(
"Control_Flag",
F.when(
F.lower(F.col("flag_sum")) > 0,
F.lit(0),
)
.otherwise(F.lit(1)),
)
.drop("cc_status", "flag_sum")
)
df.show()
output:
------- --------------- ---------------------- ----------- -------------- ---- ------------
| VehNum|Control_circuit|control_circuit_status|partnumbers| errors|Flag|Control_Flag|
------- --------------- ---------------------- ----------- -------------- ---- ------------
|4234457| ACR| ok| A234TY|Hardware Issue| 0| 0|
|4234457| ACR| ok| A234TY|Hardware Issue| 0| 0|
|4234457| ACR| ok| A234TY|Hardware Issue| 0| 0|
|4234487| QWR| not_okay| A872UR|Hardware Issue| 1| 0|
|4234487| QWR| ok| A276TY|Hardware Issue| 0| 0|
|4234456| DOC| ok| A567UR|Software Issue| 0| 0|
|4234456| DOC| not_okay| A569UR|Hardware Issue| 2| 0|
|4234456| DOC| not_okay| A568UR|Software Issue| 1| 0|
|3423448| QWR| not_okay| A872UR|Hardware Issue| 1| 1|
------- --------------- ---------------------- ----------- -------------- ---- ------------