I am trying to create a column using UDF function in PySpark.
The code I tried looks like this:
# The function checks year and adds a multiplied value_column to the final column
def new_column(row, year):
if year == "2020":
return row * 0.856
elif year == "2019":
return row * 0.8566
else:
return row
final_udf = F.udf(lambda z: new_column(z), Double()) #How do I get - Double datatype here
res = res.withColumn("final_value", final_udf(F.col('value_column'), F.col('year')))
How can I write Double() in final_udf
?
I see that for string we can use StringType()
. But what can I do to return double type in the "final_value" column?
CodePudding user response:
Use a simple string "double"
or import pypspark's DoubleType
# like this
final_udf = F.udf(lambda z: new_column(z), "double")
# or this
import pyspark.sql.types as T
final_udf = F.udf(lambda z: new_column(z), T.DoubleType())
CodePudding user response:
Input:
from pyspark.sql import functions as F, types as T
res = spark.createDataFrame([(1.0, '2020',), (1.0, '2019',), (1.0, '2018',)], ['value_column', 'year'])
udf
is very inefficient when dealing with big data.
You should first try to do it in native Spark:
res = res.withColumn(
'final_value',
F.when(F.col('year') == "2020", F.col('value_column') * 0.856)
.when(F.col('year') == "2019", F.col('value_column') * 0.8566)
.otherwise(F.col('value_column'))
)
res.show()
# ------------ ---- -----------
# |value_column|year|final_value|
# ------------ ---- -----------
# | 1.0|2020| 0.856|
# | 1.0|2019| 0.8566|
# | 1.0|2018| 1.0|
# ------------ ---- -----------
If it's impossible in native Spark, turn to pandas_udf
:
from pyspark.sql import functions as F, types as T
import pandas as pd
@F.pandas_udf(T.DoubleType())
def new_column(row: pd.Series, year: pd.Series) -> pd.Series:
if year == "2020":
return row * 0.856
elif year == "2019":
return row * 0.8566
else:
return row
res = res.withColumn("final_value", final_udf('value_column', 'year'))
res.show()
# ------------ ---- -----------
# |value_column|year|final_value|
# ------------ ---- -----------
# | 1.0|2020| 0.856|
# | 1.0|2019| 0.8566|
# | 1.0|2018| 1.0|
# ------------ ---- -----------
Only as a last resort you should go for udf
:
@F.udf(T.DoubleType())
def new_column(row, year):
if year == "2020":
return row * 0.856
elif year == "2019":
return row * 0.8566
else:
return row
res = res.withColumn("final_value", new_column('value_column', 'year'))
res.show()
# ------------ ---- -----------
# |value_column|year|final_value|
# ------------ ---- -----------
# | 1.0|2020| 0.856|
# | 1.0|2019| 0.8566|
# | 1.0|2018| 1.0|
# ------------ ---- -----------