Home > database >  Using Accumulator inside Pyspark UDF
Using Accumulator inside Pyspark UDF

Time:08-29

I want to access accumulator inside pyspark udf :

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType




accum=spark.sparkContext.accumulator(0)



def prob(g,s):
    if g=='M':
        accum.add(1)
        return 1
    else:
        accum.add(2)
        return accum.value 
    
convertUDF = udf(lambda g,s : prob(g,s),IntegerType())

problem i am getting :

  raise Exception("Accumulator.value cannot be accessed inside tasks")
Exception: Accumulator.value cannot be accessed inside tasks

Please let me know how to access accumulator value and how can we change it inside Pyspark UDF .

CodePudding user response:

You cannot access the .value of the accumulator in the udf. From the documentation (see this answer too):

Worker tasks on a Spark cluster can add values to an Accumulator with the = operator, but only the driver program is allowed to access its value, using value.

It is unclear why you need to return accum.value in this case. I believe you only need to return 2 in the else block looking at your if block:

def prob(g,s):
    if g=='M':
        accum.add(1)
        return 1
    else:
        accum.add(2)
        return 2
  • Related