Home > database >  PySpark structured streaming with window get earliest & latest record based upon a timestamp value
PySpark structured streaming with window get earliest & latest record based upon a timestamp value

Time:06-23

I have a structured streaming process that reads from a deltalake. The data contains values that are increasing with time. Within each window, I would like to get the (difference between the) earliest and the latest record based upon the timestamp of the records within that window.

Values are like

sensor_id |TimeStamp       |Value
sensor_1  |Jun 16 10:10:01 |65534
sensor_1  |Jun 16 10:10:02 |65535
sensor_1  |Jun 16 10:10:03 |0
sensor_1  |Jun 16 10:10:04 |1
...
sensor_1  |Jun 16 10:10:59 |567

I want to retrieve the earliest (Jun 16 10:10:01, 65534) and the latest value (Jun 16 10:10:59, 567) for each window

Silver = (Bronze 
    .withWatermark("TimeStamp", "1 minute") 
    .groupBy(['sensor_id', F.window('TimeStamp', '1 minute')])
    .agg(
         F.last(F.col('value')).alias('lastvalue'), 
         F.first(F.col('value')).alias('firstvalue'), 
         F.last(F.col('TimeStamp')).alias('lastTimeStamp'),
         F.first(F.col('TimeStamp')).alias('firstTimeStamp')
         )
)

The problem is that the order is non deterministic https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.last.html

So the 'last' record is not necessarily the one with the latest timestamp. Is there a way to select the value of the earliest record, and the latest record based upon the TimeStamp within the record?

Sorting when using structured streaming does not seem to work. Another possibility would be to use the lag function & sum the result - but also there haven't found any working examples using structured streaming.

CodePudding user response:

Spark 3.0 has max_by and min_by, which in your case can be well used.

from pyspark.sql import functions as F
Bronze = spark.createDataFrame(
    [('sensor_1', '2022-02-02 10:10:01', 65534),
     ('sensor_1', '2022-02-02 10:10:02', 65535),
     ('sensor_1', '2022-02-02 10:10:03', 0),
     ('sensor_1', '2022-02-02 10:10:04', 1),
     ('sensor_1', '2022-02-02 10:11:02', 2),
     ('sensor_1', '2022-02-02 10:11:04', 4),
     ('sensor_1', '2022-02-02 10:10:59', 567)],
    ['sensor_id', 'TimeStamp', 'Value'])

Silver = (Bronze 
    .withWatermark("TimeStamp", "1 minute")
    .groupBy(['sensor_id', F.window('TimeStamp', '1 minute')])
    .agg(
         F.expr("max_by(value, TimeStamp)").alias('lastvalue'),
         F.expr("min_by(value, TimeStamp)").alias('firstvalue'),
         F.max('TimeStamp').alias('lastTimeStamp'),
         F.min('TimeStamp').alias('firstTimeStamp')
    )
)
Silver.show()
#  --------- -------------------- --------- ---------- ------------------- ------------------- 
# |sensor_id|              window|lastvalue|firstvalue|      lastTimeStamp|     firstTimeStamp|
#  --------- -------------------- --------- ---------- ------------------- ------------------- 
# | sensor_1|{2022-02-02 10:10...|      567|     65534|2022-02-02 10:10:59|2022-02-02 10:10:01|
# | sensor_1|{2022-02-02 10:11...|        4|         2|2022-02-02 10:11:04|2022-02-02 10:11:02|
#  --------- -------------------- --------- ---------- ------------------- ------------------- 

Older Spark versions could do it using window functions.

from pyspark.sql import functions as F, Window as W
partition = ['sensor_id', F.window('TimeStamp', '1 minute')]
w_desc = W.partitionBy(partition).orderBy(F.desc('Timestamp'))
w_asc = W.partitionBy(partition).orderBy('Timestamp')
Silver = (Bronze 
    .withWatermark("TimeStamp", "1 minute")
    .withColumn('lastvalue', F.first('Value').over(w_desc))
    .withColumn('lastTimeStamp', F.first('TimeStamp').over(w_desc))
    .withColumn('firstvalue', F.first('Value').over(w_asc))
    .withColumn('firstTimeStamp', F.first('TimeStamp').over(w_asc))
    .groupBy(*partition)
    .agg(
         F.first('lastvalue').alias('lastvalue'), 
         F.first('firstvalue').alias('firstvalue'), 
         F.first('lastTimeStamp').alias('lastTimeStamp'),
         F.first('firstTimeStamp').alias('firstTimeStamp')
    )
)
  • Related