I want to access accumulator inside pyspark udf :
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
accum=spark.sparkContext.accumulator(0)
def prob(g,s):
if g=='M':
accum.add(1)
return 1
else:
accum.add(2)
return accum.value
convertUDF = udf(lambda g,s : prob(g,s),IntegerType())
problem i am getting :
raise Exception("Accumulator.value cannot be accessed inside tasks")
Exception: Accumulator.value cannot be accessed inside tasks
Please let me know how to access accumulator value and how can we change it inside Pyspark UDF .
CodePudding user response:
You cannot access the .value
of the accumulator in the udf. From the documentation (see this answer too):
Worker tasks on a Spark cluster can add values to an Accumulator with the = operator, but only the driver program is allowed to access its value, using value.
It is unclear why you need to return accum.value
in this case. I believe you only need to return 2
in the else
block looking at your if
block:
def prob(g,s):
if g=='M':
accum.add(1)
return 1
else:
accum.add(2)
return 2