Home > Enterprise >  Count rows in a window in a given date range pyspark
Count rows in a window in a given date range pyspark

Time:09-17

I have the following information in a table

| equipment | run | runend   | failure | removal_date |
| A         | 1   | 1/1/2021 | 0       | 4/1/2021     |
| A         | 2   | 2/1/2021 | 0       | 4/1/2021     |
| A         | 3   | 3/1/2021 | 0       | 4/1/2021     |
| A         | 4   | 4/1/2021 | 1       | 4/1/2021     |
| A         | 5   | 4/1/2021 | 0       | 20/1/2021   |
| A         | 6   | 10/1/2021 | 0       | 20/1/2021     |

I would like to create an extra column that has a count down to the failure point, so something that looks like this:

| equipment | run | runend    | failure | removal_date | RUL |
| A         | 1   | 1/1/2021  | 0       | 4/1/2021     | 3   |
| A         | 2   | 2/1/2021  | 0       | 4/1/2021     | 2   |
| A         | 3   | 3/1/2021  | 0       | 4/1/2021     | 1   |
| A         | 4   | 4/1/2021  | 1       | 4/1/2021     | 0   |
| A         | 5   | 4/1/2021  | 0       | 20/1/2021    | 16  |
| A         | 6   | 10/1/2021 | 0       | 20/1/2021    | 10  |

So basically a count where each row is counted up until the runend which is closest to the removal_date shown in the table.

I think this can be achieved using a window function and I have managed to add a column which counts all of the rows for an equipment, but I am stuck on how I narrow this window down and then have a count where the first row actually takes the last count and works backwards. This is what I have so far:

w = Window.partitionBy("equipment", "run").orderBy(asc("runend"))
df = df.withColumn("rank", rank().over(w))

# Just to see what the df looks like
df.where(col("equipment") == "A").groupby("equipment", "run", "rank", "failure", "runend", "removal_date").count().orderBy("equipment", "runend").show()

So I get a table that looks like that, I think I am on the right track but still missing some parts

| equipment | run | runend    | failure | removal_date | rank|
| A         | 1   | 1/1/2021  | 0       | 4/1/2021     | 1   |
| A         | 2   | 2/1/2021  | 0       | 4/1/2021     | 2   |
| A         | 3   | 3/1/2021  | 0       | 4/1/2021     | 3   |
| A         | 4   | 4/1/2021  | 1       | 4/1/2021     | 4   |
| A         | 5   | 4/1/2021  | 0       | 20/1/2021    | 5   |
| A         | 6   | 10/1/2021 | 0       | 20/1/2021    | 6   |

CodePudding user response:

This can be done using simple datediff function, no need to use window concept. Following is the code.

>>> from pyspark.sql import functions as f

>>> df1 = spark.createDataFrame([
        ("A",1,"1/1/2021",0,"4/1/2021"),
        ("A",2,"2/1/2021",0,"4/1/2021"),
        ("A",3,"3/1/2021",0,"4/1/2021"),
        ("A",4,"4/1/2021",1,"4/1/2021"),
        ("A",5,"4/1/2021",0,"20/1/2021"),
        ("A",6,"10/1/2021",0,"20/1/2021")
    ], schema=["equipment","run","runend","failure","removal_date"]
)

>>> df1.show()
 --------- --- --------- ------- ------------ 
|equipment|run|   runend|failure|removal_date|
 --------- --- --------- ------- ------------ 
|        A|  1| 1/1/2021|      0|    4/1/2021|
|        A|  2| 2/1/2021|      0|    4/1/2021|
|        A|  3| 3/1/2021|      0|    4/1/2021|
|        A|  4| 4/1/2021|      1|    4/1/2021|
|        A|  5| 4/1/2021|      0|   20/1/2021|
|        A|  6|10/1/2021|      0|   20/1/2021|
 --------- --- --------- ------- ------------ 

>>> df2 = df1.withColumn("runend", f.to_date(f.col("runend"), "d/M/yyyy")) \
        .withColumn("removal_date", f.to_date(f.col("removel_date"), "d/M/yyyy"))   

>>> df2.show()
 --------- --- ---------- ------- ------------ 
|equipment|run|    runend|failure|removal_date|
 --------- --- ---------- ------- ------------ 
|        A|  1|2021-01-01|      0|  2021-01-04|
|        A|  2|2021-01-02|      0|  2021-01-04|
|        A|  3|2021-01-03|      0|  2021-01-04|
|        A|  4|2021-01-04|      1|  2021-01-04|
|        A|  5|2021-01-04|      0|  2021-01-20|
|        A|  6|2021-01-10|      0|  2021-01-20|
 --------- --- ---------- ------- ------------ 

>>> df3 = df2.withColumn("RUL", f.datediff(f.col("removal_date"), f.col("runend")))

>>> df3.show()
 --------- --- ---------- ------- ------------ --- 
|equipment|run|    runend|failure|removal_date|RUL|
 --------- --- ---------- ------- ------------ --- 
|        A|  1|2021-01-01|      0|  2021-01-04|  3|
|        A|  2|2021-01-02|      0|  2021-01-04|  2|
|        A|  3|2021-01-03|      0|  2021-01-04|  1|
|        A|  4|2021-01-04|      1|  2021-01-04|  0|
|        A|  5|2021-01-04|      0|  2021-01-20| 16|
|        A|  6|2021-01-10|      0|  2021-01-20| 10|
 --------- --- ---------- ------- ------------ --- 
  • Related