Home > Enterprise >  UDF function for a double datatype in PySpark
UDF function for a double datatype in PySpark

Time:10-25

I am trying to create a column using UDF function in PySpark.

The code I tried looks like this:

# The function checks year and adds a multiplied value_column to the final column

def new_column(row, year):
    if year == "2020":
        return row * 0.856 
    elif year == "2019": 
        return row * 0.8566
    else:
        return row

final_udf = F.udf(lambda z: new_column(z), Double()) #How do I get - Double datatype here 
res = res.withColumn("final_value", final_udf(F.col('value_column'), F.col('year')))

How can I write Double() in final_udf? I see that for string we can use StringType(). But what can I do to return double type in the "final_value" column?

CodePudding user response:

Use a simple string "double" or import pypspark's DoubleType

# like this
final_udf = F.udf(lambda z: new_column(z), "double")

# or this
import pyspark.sql.types as T
final_udf = F.udf(lambda z: new_column(z), T.DoubleType())

CodePudding user response:

Input:

from pyspark.sql import functions as F, types as T
res = spark.createDataFrame([(1.0, '2020',), (1.0, '2019',), (1.0, '2018',)], ['value_column', 'year'])

udf is very inefficient when dealing with big data.

You should first try to do it in native Spark:

res = res.withColumn(
    'final_value',
    F.when(F.col('year') == "2020", F.col('value_column') * 0.856)
     .when(F.col('year') == "2019", F.col('value_column') * 0.8566)
     .otherwise(F.col('value_column'))
)
res.show()
#  ------------ ---- ----------- 
# |value_column|year|final_value|
#  ------------ ---- ----------- 
# |         1.0|2020|      0.856|
# |         1.0|2019|     0.8566|
# |         1.0|2018|        1.0|
#  ------------ ---- ----------- 

If it's impossible in native Spark, turn to pandas_udf:

from pyspark.sql import functions as F, types as T
import pandas as pd

@F.pandas_udf(T.DoubleType())
def new_column(row: pd.Series, year: pd.Series) -> pd.Series:
    if year == "2020":
        return row * 0.856 
    elif year == "2019": 
        return row * 0.8566
    else:
        return row

res = res.withColumn("final_value", final_udf('value_column', 'year'))

res.show()
#  ------------ ---- ----------- 
# |value_column|year|final_value|
#  ------------ ---- ----------- 
# |         1.0|2020|      0.856|
# |         1.0|2019|     0.8566|
# |         1.0|2018|        1.0|
#  ------------ ---- ----------- 

Only as a last resort you should go for udf:

@F.udf(T.DoubleType())
def new_column(row, year):
    if year == "2020":
        return row * 0.856 
    elif year == "2019": 
        return row * 0.8566
    else:
        return row

res = res.withColumn("final_value", new_column('value_column', 'year'))

res.show()
#  ------------ ---- ----------- 
# |value_column|year|final_value|
#  ------------ ---- ----------- 
# |         1.0|2020|      0.856|
# |         1.0|2019|     0.8566|
# |         1.0|2018|        1.0|
#  ------------ ---- ----------- 
  • Related