Home > Software design >  Filter rows conditionally in PySpark: Aggregate/Window/Generate expressions are not valid in where c
Filter rows conditionally in PySpark: Aggregate/Window/Generate expressions are not valid in where c

Time:01-18

I have the following dataframe in PySpark:

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("myApp").getOrCreate()

# Create a list of rows for the DataFrame
rows = [("2011-11-13 11:00", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
        ("2011-11-13 11:01", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
        ("2011-11-13 11:04", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
        ("2011-11-13 11:15", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
        ("2011-12-15 15:00", 2, "2011-12-15 15:05", "2012-01-02 15:00"),
        ("2011-12-15 15:06", 2, "2011-12-15 15:05", "2012-01-02 15:00"),
        ("2011-12-15 15:08", None, None, None)]

# Create a DataFrame from the rows
df = spark.createDataFrame(rows, ["timestamp", "myid", "start_timestamp", "end_timestamp"])

# Show the DataFrame
df.show()

The printed output:

 ------------------- ---- ------------------- --------------------- 
|          timestamp|myid|   start_timestamp |       end_timestamp |
 ------------------- ---- ------------------- --------------------- 
|2011-11-13 11:00:00|   1|2011-11-13 11:06:00|2011-11-14 11:00:00  |
|2011-11-13 11:01:00|   1|2011-11-13 11:06:00|2011-11-14 11:00:00  |
|2011-11-13 11:04:00|   1|2011-11-13 11:06:00|2011-11-14 11:00:00  |
|2011-11-13 11:15:00|   1|2011-11-13 11:06:00|2011-11-14 11:00:00  |
|2011-12-15 15:00:00|   2|2011-12-15 15:05:00|2012-01-02 15:00:00  |
|2011-12-15 15:06:00|   2|2011-12-15 15:05:00|2012-01-02 15:00:00  |
|2011-12-15 15:08:00|null|               null|                null |
|2011-12-17 16:00:00|null|               null|                null |
 ------------------- ---- ------------------- --------------------- 

I need to only select the rows that:

  1. have null values in "start_timespamp" and "end_timestamp", or
  2. have the closest "timestamp" to "start_timestamp" column values.

For the above example, the expected result is this one:

 -------------------- ------------ ---------------------- --------------------- 
|timestamp           |     myid   |  start_timespamp     |     end_timestamp
 -------------------- ------------ ---------------------- --------------------- 
|2011-11-13 11:04    |    1       |  2011-11-13 11:06    |   2011-11-14  11:00
|2011-12-15 15:06    |    2       |  2011-12-15 15:05    |   2012-01-02  15:00
|2011-12-15 15:08    |    null    |  null                |   null
|2011-12-17 16:00:00 |    null    |  null                |   null
 -------------------- ------------ ---------------------- --------------------- 

This is my current code, but it gives wrong result:

enter image description here

Moreover, in case of my ral dataset this code fails with the error Aggregate/Window/Generate expressions are not valid in where clause of the query:

from pyspark.sql import Window
from pyspark.sql.functions import abs, col, min

# Create a window to partition the data by "myid" and order by "timestamp"
window = Window.partitionBy("myid").orderBy("timestamp")

# Add a new column "time_diff" that calculates the absolute difference between "timestamp" and "start_timestamp"
df = df.withColumn("time_diff", abs(col("timestamp").cast("long") - col("start_timestamp").cast("long")))

# Add a new column "min_time_diff" that contains the minimum "time_diff" for each "myid"
df = df.withColumn("min_time_diff", min("time_diff").over(window))

# Select the rows that have null values in "start_timespamp" and "end_timestamp"
# or have the minimum value in "time_diff" for each "myid"
df = df.filter((col("start_timestamp").isNull() & col("end_timestamp").isNull()) | 
              (col("time_diff") == col("min_time_diff")))

CodePudding user response:

This was my approach:

from pyspark.sql import functions as psf
from pyspark.sql.window import Window

# Create a list of rows for the DataFrame
rows = [("2011-11-13 11:00", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
        ("2011-11-13 11:01", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
        ("2011-11-13 11:04", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
        ("2011-11-13 11:15", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
        ("2011-12-15 15:00", 2, "2011-12-15 15:05", "2012-01-02 15:00"),
        ("2011-12-15 15:06", 2, "2011-12-15 15:05", "2012-01-02 15:00"),
        ("2011-12-15 15:08", None, None, None),
        ("2011-12-17 16:00:00", None, None, None)]

# Create a DataFrame from the rows
df = spark.createDataFrame(rows, ["timestamp", "myid", "start_timestamp", "end_timestamp"])

# cast timestamps
df = df.withColumn(
        "timestamp", psf.col("timestamp").cast("timestamp")
    ).withColumn(
        "start_timestamp", psf.col("start_timestamp").cast("timestamp")
    ).withColumn(
        "end_timestamp", psf.col("end_timestamp").cast("timestamp")
    )

# as per your code 
df = df.withColumn("time_diff", psf.abs(psf.col("timestamp").cast("long") - psf.col("start_timestamp").cast("long")))

# generate row number for selection of min time diff
window_spec = Window.partitionBy('myid').orderBy(psf.col('time_diff'))
df = df.withColumn('rn', psf.row_number().over(window_spec))

# apply selection criteria
df.filter(
    (psf.col('start_timestamp').isNull() & psf.col('end_timestamp').isNull()) 
    | (psf.col('rn')==psf.lit(1))
).show()

 ------------------- ---- ------------------- ------------------- --------- --- 
|          timestamp|myid|    start_timestamp|      end_timestamp|time_diff| rn|
 ------------------- ---- ------------------- ------------------- --------- --- 
|2011-12-15 15:08:00|null|               null|               null|     null|  1|
|2011-12-17 16:00:00|null|               null|               null|     null|  2|
|2011-11-13 11:04:00|   1|2011-11-13 11:06:00|2011-11-14 11:00:00|      120|  1|
|2011-12-15 15:06:00|   2|2011-12-15 15:05:00|2012-01-02 15:00:00|       60|  1|
 ------------------- ---- ------------------- ------------------- --------- --- 
  • Related