This is very weird I tried using pandas udf on a spark df and it works only if i do select and return one value which is the average of the column
but if i try to fill the whole column with this value then it doesnt work
the following works:
@pandas_udf(DoubleType())
def avg(col ) :
cl = np.average(col)
return cl
df.select(avg('col' ))
this works and returns a df of one row containing the value average of column.
but the following doesnt work
df.withColumn('avg', F.lit( avg(col))
why? if avg(col) is a value then why cant i use that to fill the column with a lit()?
like the following example which does work. This does work when i return a constant number
@pandas_udf(DoubleType())
def avg(col ) :
return 5
df.withColumn('avg', avg(col)
I also tried returning a series and didnt work either
@pandas_udf(DoubleType())
def avg(col ) :
cl = np.average(col)
return pd.Series([cl]* col.size())
df.withColumn('avg', avg(col))
doesnt work. But does work if i use a constant instead of cl
So basically how could i return a full column containing the same value of the average to fill up the whole column with that value?
CodePudding user response:
lit is evaluated on driver and not executed on the data on the executor. The best way to achieve this would be to simply define a window spec for the entire dataset and call the aggregate function over the window. This would eliminate the need for an extra UDF.
windowSpec = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('avg', avg(col).over(windowSpec))
CodePudding user response:
Type cast it to float()
.
I am not sure what you are trying to achieve here. The UDF is called for each row. So, inside the UDF, the "col" represents each individual cell value - it does not represent the entire column.
If your column is of type array/list:
df = spark.createDataFrame(
[
[[1.0, 2.0, 3.0, 4.0]],
[[5.0, 6.0, 7.0, 8.0]],
],
["num"]
)
@F.udf(returnType=DoubleType())
def avg(col):
import numpy as np
return float(np.average(col))
#
df = df.withColumn("avg", avg("num"))
-------------------- ---
| num|avg|
-------------------- ---
|[1.0, 2.0, 3.0, 4.0]|2.5|
|[5.0, 6.0, 7.0, 8.0]|6.5|
-------------------- ---
But if your column is a scalar type like double/float, then the average of it via UDF will always return the same column value:
df = spark.createDataFrame(
[[1.0],[2.0],[3.0],[4.0]],
["num"]
)
@F.udf(returnType=DoubleType())
def avg(col):
import numpy as np
return float(np.average(col))
#
df = df.withColumn("avg", avg("num"))
--- ---
|num|avg|
--- ---
|1.0|1.0|
|2.0|2.0|
|3.0|3.0|
|4.0|4.0|
--- ---