Home > other >  org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFr
org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFr

Time:11-30

I'm doing the window-based sorting for the Spark Structured Streaming:

val filterWindow: WindowSpec = Window
  .partitionBy("key")
  .orderBy($"time")

controlDataFrame=controlDataFrame.withColumn("Make Coffee", $"value").    
  withColumn("datetime", date_trunc("second", current_timestamp())).
  withColumn("time", current_timestamp()).
  withColumn("temp_rank", rank().over(filterWindow))
  .filter(col("temp_rank") === 1)
  .drop("temp_rank").
  withColumn("digitalTwinId", lit(digitalTwinId)).
  withWatermark("datetime", "10 seconds")

I'm obtaining time as current_timestamp() and in schemat I see its type as StructField(time,TimestampType,true)

Why Spark 3.0 doesn't allow me to do the window operation based on it with the following exception, as the filed is clearly time-based?

21/11/22 10:34:03 WARN SparkSession$Builder: Using an existing SparkSession; some spark core configurations may not take effect.

org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets;;
Window [rank(time#163) windowspecdefinition(key#150, time#163 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS temp_rank#171], [key#150], [time#163 ASC NULLS FIRST]
 - Project [key#150, value#151, Make Coffee#154, datetime#158, time#163]

CodePudding user response:

So, according to the answer https://issues.apache.org/jira/browse/SPARK-37439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

time-based window IS NOT as just Window/WindowSpec based on the column of timestamp format. You should explicitly use session_window() function for the timestamp columns.

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#types-of-time-windows

  • Related