I wanted to re-label healthy examples (0) as failure (1) for 2 days before the actual failure for all serial numbers in the failure column. Here is my code:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
spark
url="https://gist.githubusercontent.com/JishanAhmed2019/e464ca4da5c871428ca9ed9264467aa0/raw/da3921c1953fefbc66dddc3ce238dac53142dba8/failure.csv"
from pyspark import SparkFiles
spark.sparkContext.addFile(url)
df=spark.read.csv(SparkFiles.get("failure.csv"), header=True,sep='\t')
I wanted to re-label the red marked 0 as 1. Also, Serial C was mistakenly present in the database as healthy even after the actual failure.
CodePudding user response:
I would recast the date column as a Timestamp
because this will allow you to take the difference between any two Timestamps, which we will need to do.
Then you can create a new column called failure_dates
that contains the date whenever a failure occurs, and is null
otherwise.
Next, create a new column called 2_days_to_failure
where you partition by serial_number
and take the difference between the max value in the failure_date
column each date inside the partition to get the number of days to failure, returning 1 whenever there is 2 days or fewer to failure.
Finally, we can create a column called failure_relabeled
by combining the information from the columns 2_days_to_failure
and the original failure
column.
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
window = Window.partitionBy("serial_number")
df.withColumn(
'date', F.to_timestamp(F.col('date'), 'M/D/yyyy')
).withColumn(
"failure_dates", F.when(F.col('failure') == 1, F.col('date'))
).withColumn(
"2_days_to_failure", F.datediff(F.max(F.col('failure_dates')).over(window), F.col('date')) <= 2
).withColumn(
"failure_relabeled", F.when((F.col('2_days_to_failure') | (F.col('failure') == 1)), F.lit(1)).otherwise(F.lit(0))
).orderBy('serial_number','date').show()
------------------- ------------- ------- ----------- ------------- ------------------- ----------------- -----------------
| date|serial_number|failure|smart_5_raw|smart_187_raw| failure_dates|2_days_to_failure|failure_relabeled|
------------------- ------------- ------- ----------- ------------- ------------------- ----------------- -----------------
|2014-01-01 00:00:00| A| 0| 0| 60| null| false| 0|
|2014-01-02 00:00:00| A| 0| 0| 180| null| false| 0|
|2014-01-03 00:00:00| A| 0| 0| 140| null| true| 1|
|2014-01-04 00:00:00| A| 0| 0| 280| null| true| 1|
|2014-01-05 00:00:00| A| 1| 0| 400|2014-01-05 00:00:00| true| 1|
|2014-01-01 00:00:00| B| 0| 0| 40| null| null| 0|
|2014-01-02 00:00:00| B| 0| 0| 160| null| null| 0|
|2014-01-03 00:00:00| B| 0| 0| 100| null| null| 0|
|2014-01-04 00:00:00| B| 0| 0| 320| null| null| 0|
|2014-01-05 00:00:00| B| 0| 0| 340| null| null| 0|
|2014-01-06 00:00:00| B| 0| 0| 400| null| null| 0|
|2014-01-01 00:00:00| C| 0| 0| 80| null| true| 1|
|2014-01-02 00:00:00| C| 0| 0| 200| null| true| 1|
|2014-01-03 00:00:00| C| 1| 0| 120|2014-01-03 00:00:00| true| 1|
|2014-01-04 00:00:00| D| 0| 0| 300| null| null| 0|
|2014-01-05 00:00:00| D| 0| 0| 360| null| null| 0|
------------------- ------------- ------- ----------- ------------- ------------------- ----------------- -----------------