Home > Enterprise >  Accumulator in Spark Scala: Counter value is wrong when calculated in a filter and used with withCol
Accumulator in Spark Scala: Counter value is wrong when calculated in a filter and used with withCol

Time:06-28

I'm trying to count the number of valid and invalid data, that is present in a file. Below is the code to do the same,

val badDataCountAcc = spark.sparkContext.longAccumulator("BadDataAcc")
val goodDataCountAcc = spark.sparkContext.longAccumulator("GoodDataAcc")

val dataframe = spark
      .read
      .format("csv")
      .option("header", true)
      .option("inferSchema", true)
      .load(path)
      .filter(data => {
        val matcher = regex.matcher(data.toString())
        if (matcher.find()) {
          goodDataCountAcc.add(1)
          println("GoodDataCountAcc: "   goodDataCountAcc.value)
          true
        } else {
          badDataCountAcc.add(1)
          println("BadDataCountAcc: "   badDataCountAcc.value)
          false
        }
      }
      )
     .withColumn("FileName", input_file_name())


dataframe.show()
val filename = dataframe
      .select("FileName")
      .distinct()

val name = filename.collectAsList().get(0).toString()
println(""   filename)


println("Bad data Count Acc: "   badDataCountAcc.value)
println("Good data Count Acc: "   goodDataCountAcc.value)

I ran this code for the sample data that has 2 valid and 3 invalid data. Inside the filter, where I'm printing the counts, values are correct. But outside the filter when I'm printing the values for count, it is coming as 4 for good data and 6 for bad data.

Questions:

  • When I remove the withColumn statement at the end - along with the code which calculates distinct filename - values are printed correctly. I'm not sure why?
  • I do have a requirement to get the input filename as well. What would be best way to do that here?

CodePudding user response:

First of all, Accumulator belongs to the RDD API, while you are using Dataframes. Dataframes are compiled down to RDDs in the end, but they are at a higher level of abstraction. It is better to use aggregations instead of Accumulators in this context.

From the Spark Accumulators documentation:

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like map(). The below code fragment demonstrates this property:

Your DataFrame filter will be complied to an RDD filter, which is not an action but a transformation (and thus lazy), so this only-once guarantee does not hold in your case. How many times your code is executed depends is implementation-dependent, and may change with Spark versions, so you should not rely on it.

Regarding your two questions:

  1. This cannot be answered based on your code snippet because it doesn't contain any actions. Is it even the exact code snippet you use? I suspect that if you actually execute the code you posted without any additions except for the missing imports, it should print 0 two times because nothing is executed. Either way, you should always assume that an accumulator inside an RDD transformation is potentially executed multiple times (or even not at all if it is in a DataFrame operation which can possibly be optimized out).

  2. Your approach of using withColumn is perfectly fine.

I'd suggest using DataFrame expressions and aggregations (or equivalent Spark SQL if you prefer that). The regex matching can be done using rlike, using the columns instead of relying of toString(), e.g. .withColumn("IsGoodData", $"myColumn1".rlike(regex1) && $"myColumn2".rlike(regex2)).

Then you can count the good and bad records using an aggregation like dataframe.groupBy($"IsGoodData").count()

EDIT: With the additional lines the answer to your first question is also clear: The first time was from the dataframe.show() and the second time from the filename.collectAsList(), which you probably also removed as it depends on the added column. Please make sure you understand the distinction between Spark transformations and actions and the lazy evaluation model of Spark. Otherwise you won't be very happy with it :-)

  • Related