I have a dataframe looking like this:
| id | device| x | y | z | timestamp |
1 device_1 22 8 23 2020-10-30T16:00:00.000 0000
1 device_1 21 88 65 2020-10-30T16:01:00.000 0000
1 device_1 33 34 64 2020-10-30T16:02:00.000 0000
2 device_2 12 6 97 2019-11-30T13:00:00.000 0000
2 device_2 44 77 13 2019-11-30T13:00:00.000 0000
1 device_1 22 11 30 2022-10-30T08:00:00.000 0000
1 device_1 22 11 30 2022-10-30T08:01:00.000 0000
The data represents events for an "id" on a certain point in time. I would like to see development of values over a period of time to plot a time series for instance.
I'm thinking of adding a column 'duration' which is 0 for the first entry and then the difference in the next entry related to the same id on the same day (there might be multiple different event streams for the same id on separate days).
I would ideally want a dataframe looking something like this:
| id | device | x | y | z | timestamp | duration |
1 device_1 22 8 23 2020-10-30T16:00:00.000 0000 00:00.00.000
1 device_1 21 88 65 2020-10-30T16:01:00.000 0000 00:01:00.000
1 device_1 33 34 64 2020-10-30T16:02:00.000 0000 00:02:00.000
2 device_2 12 6 97 2019-11-30T13:00:00.000 0000 00:00:00.000
2 device_2 44 77 13 2019-11-30T13:00:30.000 0000 00:00:30.000
1 device_1 22 11 30 2022-10-30T08:00:00.000 0000 00:00:00.000
1 device_1 22 11 30 2022-10-30T08:01:00.000 0000 00:01:00.000
I have no idea where to begin in order to achieve this so either a good explanation or a code example would be very helpful!
Any other suggestions on how to be able to plot development over time (in general not related to a specific date or time of the day) based on this dataframe are also very welcome.
Note: It has to be in PySpark (not pandas) since the dataset is extremely large.
CodePudding user response:
your problem can be resolved using Window functions a below
from pyspark.sql import SparkSession
spark = SparkSession.Builder().getOrCreate()
df = spark.createDataFrame(
[
(1,'device_1',22,8,23,'2020-10-30T16:00:00.000 0000'),
(1,'device_1',21,88,65,'2020-10-30T16:01:00.000 0000'),
(1,'device_1',33,34,64,'2020-10-30T16:02:00.000 0000'),
(2,'device_2',12,6,97,'2019-11-30T13:00:00.000 0000') ,
(2,'device_2',44,77,13,'2019-11-30T13:00:00.000 0000'),
(1,'device_1',22,11,30,'2022-10-30T08:00:00.000 0000'),
(1,'device_1',22,11,30,'2022-10-30T08:01:00.000 0000')
],
("id", "device_name", "x","y","z","timestmp"))
df.show(5, False)
--- ----------- --- --- --- ----------------------------
|id |device_name|x |y |z |timestmp |
--- ----------- --- --- --- ----------------------------
|1 |device_1 |22 |8 |23 |2020-10-30T16:00:00.000 0000|
|1 |device_1 |21 |88 |65 |2020-10-30T16:01:00.000 0000|
|1 |device_1 |33 |34 |64 |2020-10-30T16:02:00.000 0000|
|2 |device_2 |12 |6 |97 |2019-11-30T13:00:00.000 0000|
|2 |device_2 |44 |77 |13 |2019-11-30T13:00:00.000 0000|
--- ----------- --- --- --- ----------------------------
from pyspark.sql.functions import *
df_1 = df.withColumn("timestmp_t", to_timestamp(col("timestmp")))
df_1 = df_1.withColumn("date_t", to_date(substring(col("timestmp"), 1, 10)))
df_1.show(5)
--- ----------- --- --- --- -------------------- ------------------- ----------
| id|device_name| x| y| z| timestmp| timestmp_t| date_t|
--- ----------- --- --- --- -------------------- ------------------- ----------
| 1| device_1| 22| 8| 23|2020-10-30T16:00:...|2020-10-30 16:00:00|2020-10-30|
| 1| device_1| 21| 88| 65|2020-10-30T16:01:...|2020-10-30 16:01:00|2020-10-30|
| 1| device_1| 33| 34| 64|2020-10-30T16:02:...|2020-10-30 16:02:00|2020-10-30|
| 2| device_2| 12| 6| 97|2019-11-30T13:00:...|2019-11-30 13:00:00|2019-11-30|
| 2| device_2| 44| 77| 13|2019-11-30T13:00:...|2019-11-30 13:00:00|2019-11-30|
--- ----------- --- --- --- -------------------- ------------------- ----------
df_1.createOrReplaceTempView("tmp_table")
spark.sql("""
select t.*, (timestmp_t - min) as duration from (
SELECT id, device_name, date_t, timestmp_t, MIN(timestmp_t) OVER (PARTITION BY id, date_t ORDER BY timestmp_t) AS min
FROM tmp_table) as t
""").show(5, False)
--- ----------- ---------- ------------------- ------------------- -----------------------------------
|id |device_name|date_t |timestmp_t |min |duration |
--- ----------- ---------- ------------------- ------------------- -----------------------------------
|1 |device_1 |2020-10-30|2020-10-30 16:00:00|2020-10-30 16:00:00|INTERVAL '0 00:00:00' DAY TO SECOND|
|1 |device_1 |2020-10-30|2020-10-30 16:01:00|2020-10-30 16:00:00|INTERVAL '0 00:01:00' DAY TO SECOND|
|1 |device_1 |2020-10-30|2020-10-30 16:02:00|2020-10-30 16:00:00|INTERVAL '0 00:02:00' DAY TO SECOND|
|1 |device_1 |2022-10-30|2022-10-30 08:00:00|2022-10-30 08:00:00|INTERVAL '0 00:00:00' DAY TO SECOND|
|1 |device_1 |2022-10-30|2022-10-30 08:01:00|2022-10-30 08:00:00|INTERVAL '0 00:01:00' DAY TO SECOND|
--- ----------- ---------- ------------------- ------------------- -----------------------------------
CodePudding user response:
You will need to use window functions (specific functions working inside partitions created using over
clause). The below code does the same thing as in the other answer, but I wanted to show a more streamlined version, fully in PySpark, as opposed to PySpark SQL with subqueries.
Initially, the column "difference" will be of type interval, so then it's up to you to try to transform it to whatever data type you need. I have just extracted the interval using regexp_extract
which stores it as string.
Input (I assume your "timestamp" column is of type timestamp):
from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[(1, 'device_1', 22, 8, 23, '2020-10-30T16:00:00.000 0000'),
(1, 'device_1', 21, 88, 65, '2020-10-30T16:01:00.000 0000'),
(1, 'device_1', 33, 34, 64, '2020-10-30T16:02:00.000 0000'),
(2, 'device_2', 12, 6, 97, '2019-11-30T13:00:00.000 0000') ,
(2, 'device_2', 44, 77, 13, '2019-11-30T13:00:30.000 0000'),
(1, 'device_1', 22, 11, 30, '2022-10-30T08:00:00.000 0000'),
(1, 'device_1', 22, 11, 30, '2022-10-30T08:01:00.000 0000')],
["id", "device", "x", "y", "z", "timestamp"]
).withColumn("timestamp", F.to_timestamp("timestamp"))
Script:
w = W.partitionBy('id', F.to_date('timestamp')).orderBy('timestamp')
df = df.withColumn('duration', F.col('timestamp') - F.min('timestamp').over(w))
df = df.withColumn('duration', F.regexp_extract('duration', r'\d\d:\d\d:\d\d', 0))
df.show(truncate=0)
# --- -------- --- --- --- ------------------- --------
# |id |device |x |y |z |timestamp |duration|
# --- -------- --- --- --- ------------------- --------
# |1 |device_1|22 |8 |23 |2020-10-30 16:00:00|00:00:00|
# |1 |device_1|21 |88 |65 |2020-10-30 16:01:00|00:01:00|
# |1 |device_1|33 |34 |64 |2020-10-30 16:02:00|00:02:00|
# |1 |device_1|22 |11 |30 |2022-10-30 08:00:00|00:00:00|
# |1 |device_1|22 |11 |30 |2022-10-30 08:01:00|00:01:00|
# |2 |device_2|12 |6 |97 |2019-11-30 13:00:00|00:00:00|
# |2 |device_2|44 |77 |13 |2019-11-30 13:00:30|00:00:30|
# --- -------- --- --- --- ------------------- --------