Home > Net >  Eliminate duplicates (deduplication) in Streaming DataFrame
Eliminate duplicates (deduplication) in Streaming DataFrame

Time:10-01

I have a Spark streaming processor. The Dataframe dfNewExceptions has duplicates (duplicate by "ExceptionId"). Since this is a streaming dataset, the below query fails:

val dfNewUniqueExceptions = dfNewExceptions.sort(desc("LastUpdateTime"))
                                    .coalesce(1)
                                    .dropDuplicates("ExceptionId")
                                    
val dfNewExceptionCore = dfNewUniqueExceptions.select("ExceptionId", "LastUpdateTime")
    dfNewExceptionCore.writeStream
      .format("console")
//      .outputMode("complete")
      .option("truncate", "false")
      .option("numRows",5000)
      .start()
      .awaitTermination(1000)
  

** Exception in thread "main" org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;; **

This is also documented here: https://home.apache.org/~pwendell/spark-nightly/spark-branch-2.0-docs/latest/structured-streaming-programming-guide.html

Any suggestions on how the duplicates can be removed from dfNewExceptions?

CodePudding user response:

I recommend to follow the approach explained in the Structured Streaming Guide on Streaming Deduplication. There it says:

You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as de-duplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use de-duplication with or without watermarking.

With watermark - If there is an upper bound on how late a duplicate record may arrive, then you can define a watermark on an event time column and deduplicate using both the guid and the event time columns. The query will use the watermark to remove old state data from past records that are not expected to get any duplicates any more. This bounds the amount of the state the query has to maintain.

An example in Scala is also given:

val dfExceptions = spark.readStream. ... // columns: ExceptionId, LastUpdateTime, ... 

dfExceptions 
  .withWatermark("LastUpdateTime", "10 seconds") 
  .dropDuplicates("ExceptionId", "LastUpdateTime")

De-Duplication in a stream is a difficult task to do. Especially when the LastUpdateTime is in random order. Keep in mind the balance between a long enough upper bound in the watermark and when to maybe switch from streaming to batch processing. Also refer to what I have written in another answer on Spark Structured Streaming Deduplication with Watermark.

CodePudding user response:

You can use watermarking to drop duplicates in a specific timeframe.

  • Related