I have the following dataframe in PySpark:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("myApp").getOrCreate()
# Create a list of rows for the DataFrame
rows = [("2011-11-13 11:00", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
("2011-11-13 11:01", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
("2011-11-13 11:04", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
("2011-11-13 11:15", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
("2011-12-15 15:00", 2, "2011-12-15 15:05", "2012-01-02 15:00"),
("2011-12-15 15:06", 2, "2011-12-15 15:05", "2012-01-02 15:00"),
("2011-12-15 15:08", None, None, None)]
# Create a DataFrame from the rows
df = spark.createDataFrame(rows, ["timestamp", "myid", "start_timestamp", "end_timestamp"])
# Show the DataFrame
df.show()
The printed output:
------------------- ---- ------------------- ---------------------
| timestamp|myid| start_timestamp | end_timestamp |
------------------- ---- ------------------- ---------------------
|2011-11-13 11:00:00| 1|2011-11-13 11:06:00|2011-11-14 11:00:00 |
|2011-11-13 11:01:00| 1|2011-11-13 11:06:00|2011-11-14 11:00:00 |
|2011-11-13 11:04:00| 1|2011-11-13 11:06:00|2011-11-14 11:00:00 |
|2011-11-13 11:15:00| 1|2011-11-13 11:06:00|2011-11-14 11:00:00 |
|2011-12-15 15:00:00| 2|2011-12-15 15:05:00|2012-01-02 15:00:00 |
|2011-12-15 15:06:00| 2|2011-12-15 15:05:00|2012-01-02 15:00:00 |
|2011-12-15 15:08:00|null| null| null |
|2011-12-17 16:00:00|null| null| null |
------------------- ---- ------------------- ---------------------
I need to only select the rows that:
- have null values in "start_timespamp" and "end_timestamp", or
- have the closest "timestamp" to "start_timestamp" column values.
For the above example, the expected result is this one:
-------------------- ------------ ---------------------- ---------------------
|timestamp | myid | start_timespamp | end_timestamp
-------------------- ------------ ---------------------- ---------------------
|2011-11-13 11:04 | 1 | 2011-11-13 11:06 | 2011-11-14 11:00
|2011-12-15 15:06 | 2 | 2011-12-15 15:05 | 2012-01-02 15:00
|2011-12-15 15:08 | null | null | null
|2011-12-17 16:00:00 | null | null | null
-------------------- ------------ ---------------------- ---------------------
This is my current code, but it gives wrong result:
Moreover, in case of my ral dataset this code fails with the error Aggregate/Window/Generate expressions are not valid in where clause of the query
:
from pyspark.sql import Window
from pyspark.sql.functions import abs, col, min
# Create a window to partition the data by "myid" and order by "timestamp"
window = Window.partitionBy("myid").orderBy("timestamp")
# Add a new column "time_diff" that calculates the absolute difference between "timestamp" and "start_timestamp"
df = df.withColumn("time_diff", abs(col("timestamp").cast("long") - col("start_timestamp").cast("long")))
# Add a new column "min_time_diff" that contains the minimum "time_diff" for each "myid"
df = df.withColumn("min_time_diff", min("time_diff").over(window))
# Select the rows that have null values in "start_timespamp" and "end_timestamp"
# or have the minimum value in "time_diff" for each "myid"
df = df.filter((col("start_timestamp").isNull() & col("end_timestamp").isNull()) |
(col("time_diff") == col("min_time_diff")))
CodePudding user response:
This was my approach:
from pyspark.sql import functions as psf
from pyspark.sql.window import Window
# Create a list of rows for the DataFrame
rows = [("2011-11-13 11:00", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
("2011-11-13 11:01", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
("2011-11-13 11:04", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
("2011-11-13 11:15", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
("2011-12-15 15:00", 2, "2011-12-15 15:05", "2012-01-02 15:00"),
("2011-12-15 15:06", 2, "2011-12-15 15:05", "2012-01-02 15:00"),
("2011-12-15 15:08", None, None, None),
("2011-12-17 16:00:00", None, None, None)]
# Create a DataFrame from the rows
df = spark.createDataFrame(rows, ["timestamp", "myid", "start_timestamp", "end_timestamp"])
# cast timestamps
df = df.withColumn(
"timestamp", psf.col("timestamp").cast("timestamp")
).withColumn(
"start_timestamp", psf.col("start_timestamp").cast("timestamp")
).withColumn(
"end_timestamp", psf.col("end_timestamp").cast("timestamp")
)
# as per your code
df = df.withColumn("time_diff", psf.abs(psf.col("timestamp").cast("long") - psf.col("start_timestamp").cast("long")))
# generate row number for selection of min time diff
window_spec = Window.partitionBy('myid').orderBy(psf.col('time_diff'))
df = df.withColumn('rn', psf.row_number().over(window_spec))
# apply selection criteria
df.filter(
(psf.col('start_timestamp').isNull() & psf.col('end_timestamp').isNull())
| (psf.col('rn')==psf.lit(1))
).show()
------------------- ---- ------------------- ------------------- --------- ---
| timestamp|myid| start_timestamp| end_timestamp|time_diff| rn|
------------------- ---- ------------------- ------------------- --------- ---
|2011-12-15 15:08:00|null| null| null| null| 1|
|2011-12-17 16:00:00|null| null| null| null| 2|
|2011-11-13 11:04:00| 1|2011-11-13 11:06:00|2011-11-14 11:00:00| 120| 1|
|2011-12-15 15:06:00| 2|2011-12-15 15:05:00|2012-01-02 15:00:00| 60| 1|
------------------- ---- ------------------- ------------------- --------- ---