I am trying to convert a python function to PySpark user defined function as below:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,col,array
from pyspark.sql.types import StringType,IntegerType,DecimalType
from datetime import date
def calculateAmount(loandate,loanamount):
y,m,d = loandate.split('-')[0],loandate.split('-')[1],loandate.split('-')[2]
ld = date(int(y),int(m),int(d))
if (date(2010,1,1) <= ld <= date(2015,12,31)):
fine = 10
elif (date(2016,1,1) <=ld <= date.today()):
fine = 5
return ((100 fine)*int(loanamount))/100
spark = SparkSession.builder.appName("User Defined Functions").getOrCreate()
df = spark.read.options(delimiter = "\t",header = True).csv("../input/applicationloan/loan.txt")
calAmount = udf(lambda interest,amount : calculateAmount(interest,amount),DecimalType())
df = df.withColumn("NewLoanAmount",calAmount(col("loandate"),col("loanamount")))
df.show()
Output of above code is below:
Screenshot of source file "loan.txt":
The above source file is tab delimited.
I am creating a new column "NewLoanAmount" using PySpark udf
. But PySpark udf
is returning me "NULL" values. It seems to be an issue with calling of lambda function in the PySpark udf
.
How to write PySpark UDF with multiple parameters? I understand writing PySpark UDF with single parameter. But working with multiple parameters seems to be confusing.
CodePudding user response:
With your data, you should not create UDF. It's easily done using native Spark functions:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('John', '01', '89', 20000, '2020-10-01'),
('Monty', '02', '45', 10000, '2015-05-05'),
('Roxy', '03', '76', 30000, '2010-10-10')],
['name', 'id', 'loannum', 'loanamount', 'loandate'])
def calculateAmount(loandate, loanamount):
fine = F.when(F.col('loandate').between('2010-01-01', '2015-12-31'), 10) \
.when(F.col('loandate').between('2016-01-01', F.current_date()), 5)
return ((100 fine) * F.col('loanamount').cast('long')) / 100
df = df.withColumn("NewLoanAmount", calculateAmount("loandate", "loanamount"))
df.show()
# ----- --- ------- ---------- ---------- -------------
# |name |id |loannum|loanamount|loandate |NewLoanAmount|
# ----- --- ------- ---------- ---------- -------------
# |John |01 |89 |20000 |2020-10-01|21000.0 |
# |Monty|02 |45 |10000 |2015-05-05|11000.0 |
# |Roxy |03 |76 |30000 |2010-10-10|33000.0 |
# ----- --- ------- ---------- ---------- -------------
To answer the original question...
You already have the function definition, so lambda
is not needed. And overall, it's simpler to use the decorator @udf
instead of the line
calAmount = udf(lambda interest,amount : calculateAmount(interest,amount),DecimalType())
The following works:
from pyspark.sql.functions import udf, col, array
from pyspark.sql.types import StringType, IntegerType, DecimalType
from datetime import date
df = spark.createDataFrame(
[('John', '01', '89', 20000, '2020-10-01'),
('Monty', '02', '45', 10000, '2015-05-05'),
('Roxy', '03', '76', 30000, '2010-10-10')],
['name', 'id', 'loannum', 'loanamount', 'loandate'])
@udf
def calculateAmount(loandate, loanamount):
y, m, d = loandate.split('-')[0], loandate.split('-')[1], loandate.split('-')[2]
ld = date(int(y), int(m), int(d))
if (date(2010, 1, 1) <= ld <= date(2015, 12, 31)):
fine = 10
elif (date(2016, 1, 1) <= ld <= date.today()):
fine = 5
return ((100 fine) * int(loanamount)) / 100
df = df.withColumn("NewLoanAmount", calculateAmount(col("loandate"), col("loanamount")))
df.show()
# ----- --- ------- ---------- ---------- -------------
# |name |id |loannum|loanamount|loandate |NewLoanAmount|
# ----- --- ------- ---------- ---------- -------------
# |John |01 |89 |20000 |2020-10-01|21000.0 |
# |Monty|02 |45 |10000 |2015-05-05|11000.0 |
# |Roxy |03 |76 |30000 |2010-10-10|33000.0 |
# ----- --- ------- ---------- ---------- -------------
A way without the @udf
decorator:
calAmount = udf(calculateAmount)
or
calAmount = udf(calculateAmount, DoubleType())
In your original code, you provide DecimalType()
, but Python does not do calculations in decimal numbers. Your calculations are done on double precision numbers, not decimals. So, your result is a double precision number, while Spark expects to get a decimal number... This is why you get nulls. To make the python function to return a decimal, you would need to use decimal
library. Without it, it's just double precision numbers.