I have the following information in a table
| equipment | run | runend | failure | removal_date |
| A | 1 | 1/1/2021 | 0 | 4/1/2021 |
| A | 2 | 2/1/2021 | 0 | 4/1/2021 |
| A | 3 | 3/1/2021 | 0 | 4/1/2021 |
| A | 4 | 4/1/2021 | 1 | 4/1/2021 |
| A | 5 | 4/1/2021 | 0 | 20/1/2021 |
| A | 6 | 10/1/2021 | 0 | 20/1/2021 |
I would like to create an extra column that has a count down to the failure point, so something that looks like this:
| equipment | run | runend | failure | removal_date | RUL |
| A | 1 | 1/1/2021 | 0 | 4/1/2021 | 3 |
| A | 2 | 2/1/2021 | 0 | 4/1/2021 | 2 |
| A | 3 | 3/1/2021 | 0 | 4/1/2021 | 1 |
| A | 4 | 4/1/2021 | 1 | 4/1/2021 | 0 |
| A | 5 | 4/1/2021 | 0 | 20/1/2021 | 16 |
| A | 6 | 10/1/2021 | 0 | 20/1/2021 | 10 |
So basically a count where each row is counted up until the runend which is closest to the removal_date shown in the table.
I think this can be achieved using a window function and I have managed to add a column which counts all of the rows for an equipment, but I am stuck on how I narrow this window down and then have a count where the first row actually takes the last count and works backwards. This is what I have so far:
w = Window.partitionBy("equipment", "run").orderBy(asc("runend"))
df = df.withColumn("rank", rank().over(w))
# Just to see what the df looks like
df.where(col("equipment") == "A").groupby("equipment", "run", "rank", "failure", "runend", "removal_date").count().orderBy("equipment", "runend").show()
So I get a table that looks like that, I think I am on the right track but still missing some parts
| equipment | run | runend | failure | removal_date | rank|
| A | 1 | 1/1/2021 | 0 | 4/1/2021 | 1 |
| A | 2 | 2/1/2021 | 0 | 4/1/2021 | 2 |
| A | 3 | 3/1/2021 | 0 | 4/1/2021 | 3 |
| A | 4 | 4/1/2021 | 1 | 4/1/2021 | 4 |
| A | 5 | 4/1/2021 | 0 | 20/1/2021 | 5 |
| A | 6 | 10/1/2021 | 0 | 20/1/2021 | 6 |
CodePudding user response:
This can be done using simple datediff function, no need to use window concept. Following is the code.
>>> from pyspark.sql import functions as f
>>> df1 = spark.createDataFrame([
("A",1,"1/1/2021",0,"4/1/2021"),
("A",2,"2/1/2021",0,"4/1/2021"),
("A",3,"3/1/2021",0,"4/1/2021"),
("A",4,"4/1/2021",1,"4/1/2021"),
("A",5,"4/1/2021",0,"20/1/2021"),
("A",6,"10/1/2021",0,"20/1/2021")
], schema=["equipment","run","runend","failure","removal_date"]
)
>>> df1.show()
--------- --- --------- ------- ------------
|equipment|run| runend|failure|removal_date|
--------- --- --------- ------- ------------
| A| 1| 1/1/2021| 0| 4/1/2021|
| A| 2| 2/1/2021| 0| 4/1/2021|
| A| 3| 3/1/2021| 0| 4/1/2021|
| A| 4| 4/1/2021| 1| 4/1/2021|
| A| 5| 4/1/2021| 0| 20/1/2021|
| A| 6|10/1/2021| 0| 20/1/2021|
--------- --- --------- ------- ------------
>>> df2 = df1.withColumn("runend", f.to_date(f.col("runend"), "d/M/yyyy")) \
.withColumn("removal_date", f.to_date(f.col("removel_date"), "d/M/yyyy"))
>>> df2.show()
--------- --- ---------- ------- ------------
|equipment|run| runend|failure|removal_date|
--------- --- ---------- ------- ------------
| A| 1|2021-01-01| 0| 2021-01-04|
| A| 2|2021-01-02| 0| 2021-01-04|
| A| 3|2021-01-03| 0| 2021-01-04|
| A| 4|2021-01-04| 1| 2021-01-04|
| A| 5|2021-01-04| 0| 2021-01-20|
| A| 6|2021-01-10| 0| 2021-01-20|
--------- --- ---------- ------- ------------
>>> df3 = df2.withColumn("RUL", f.datediff(f.col("removal_date"), f.col("runend")))
>>> df3.show()
--------- --- ---------- ------- ------------ ---
|equipment|run| runend|failure|removal_date|RUL|
--------- --- ---------- ------- ------------ ---
| A| 1|2021-01-01| 0| 2021-01-04| 3|
| A| 2|2021-01-02| 0| 2021-01-04| 2|
| A| 3|2021-01-03| 0| 2021-01-04| 1|
| A| 4|2021-01-04| 1| 2021-01-04| 0|
| A| 5|2021-01-04| 0| 2021-01-20| 16|
| A| 6|2021-01-10| 0| 2021-01-20| 10|
--------- --- ---------- ------- ------------ ---