Home > Back-end >  Using pandas udf to return a full column containing average
Using pandas udf to return a full column containing average

Time:10-27

This is very weird I tried using pandas udf on a spark df and it works only if i do select and return one value which is the average of the column

but if i try to fill the whole column with this value then it doesnt work

the following works:

@pandas_udf(DoubleType())
def avg(col ) :
      cl = np.average(col)   
      return cl

df.select(avg('col' ))    

this works and returns a df of one row containing the value average of column.

but the following doesnt work

df.withColumn('avg', F.lit( avg(col))

why? if avg(col) is a value then why cant i use that to fill the column with a lit()?

like the following example which does work. This does work when i return a constant number

@pandas_udf(DoubleType())
def avg(col ) :
      return 5
df.withColumn('avg', avg(col)

I also tried returning a series and didnt work either

@pandas_udf(DoubleType())
def avg(col ) :
      cl = np.average(col)   
      return pd.Series([cl]* col.size())

df.withColumn('avg', avg(col))

doesnt work. But does work if i use a constant instead of cl

So basically how could i return a full column containing the same value of the average to fill up the whole column with that value?

CodePudding user response:

lit is evaluated on driver and not executed on the data on the executor. The best way to achieve this would be to simply define a window spec for the entire dataset and call the aggregate function over the window. This would eliminate the need for an extra UDF.

windowSpec = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('avg', avg(col).over(windowSpec))

CodePudding user response:

Type cast it to float().

I am not sure what you are trying to achieve here. The UDF is called for each row. So, inside the UDF, the "col" represents each individual cell value - it does not represent the entire column.

If your column is of type array/list:

df = spark.createDataFrame(
    [
        [[1.0, 2.0, 3.0, 4.0]],
        [[5.0, 6.0, 7.0, 8.0]],
    ],
    ["num"]
)


@F.udf(returnType=DoubleType())
def avg(col):
  import numpy as np
  return float(np.average(col))
# 

df = df.withColumn("avg", avg("num"))

 -------------------- --- 
|                 num|avg|
 -------------------- --- 
|[1.0, 2.0, 3.0, 4.0]|2.5|
|[5.0, 6.0, 7.0, 8.0]|6.5|
 -------------------- --- 

But if your column is a scalar type like double/float, then the average of it via UDF will always return the same column value:

df = spark.createDataFrame(
    [[1.0],[2.0],[3.0],[4.0]],
    ["num"]
)


@F.udf(returnType=DoubleType())
def avg(col):
  import numpy as np
  return float(np.average(col))
# 

df = df.withColumn("avg", avg("num"))

 --- --- 
|num|avg|
 --- --- 
|1.0|1.0|
|2.0|2.0|
|3.0|3.0|
|4.0|4.0|
 --- --- 
  • Related