Home > Blockchain >  User Defined Function with multiple parameters is returning NULL value
User Defined Function with multiple parameters is returning NULL value

Time:07-12

I am trying to convert a python function to PySpark user defined function as below:

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,col,array
from pyspark.sql.types import StringType,IntegerType,DecimalType
from datetime import date

def calculateAmount(loandate,loanamount):
    y,m,d = loandate.split('-')[0],loandate.split('-')[1],loandate.split('-')[2]
    ld = date(int(y),int(m),int(d))
    if (date(2010,1,1) <= ld <= date(2015,12,31)):
        fine = 10
    elif (date(2016,1,1) <=ld <= date.today()):
        fine = 5
    return ((100 fine)*int(loanamount))/100

spark = SparkSession.builder.appName("User Defined Functions").getOrCreate()
df = spark.read.options(delimiter = "\t",header = True).csv("../input/applicationloan/loan.txt")

calAmount = udf(lambda interest,amount : calculateAmount(interest,amount),DecimalType())

df = df.withColumn("NewLoanAmount",calAmount(col("loandate"),col("loanamount")))
df.show()

Output of above code is below:

Output

Screenshot of source file "loan.txt":

loan.txt

The above source file is tab delimited.

I am creating a new column "NewLoanAmount" using PySpark udf. But PySpark udf is returning me "NULL" values. It seems to be an issue with calling of lambda function in the PySpark udf.

How to write PySpark UDF with multiple parameters? I understand writing PySpark UDF with single parameter. But working with multiple parameters seems to be confusing.

CodePudding user response:

With your data, you should not create UDF. It's easily done using native Spark functions:

from pyspark.sql import functions as F

df = spark.createDataFrame(
    [('John', '01', '89', 20000, '2020-10-01'),
     ('Monty', '02', '45', 10000, '2015-05-05'),
     ('Roxy', '03', '76', 30000, '2010-10-10')],
    ['name', 'id', 'loannum', 'loanamount', 'loandate'])

def calculateAmount(loandate, loanamount):
    fine = F.when(F.col('loandate').between('2010-01-01', '2015-12-31'), 10) \
            .when(F.col('loandate').between('2016-01-01', F.current_date()), 5)
    return ((100   fine) * F.col('loanamount').cast('long')) / 100

df = df.withColumn("NewLoanAmount", calculateAmount("loandate", "loanamount"))

df.show()
#  ----- --- ------- ---------- ---------- ------------- 
# |name |id |loannum|loanamount|loandate  |NewLoanAmount|
#  ----- --- ------- ---------- ---------- ------------- 
# |John |01 |89     |20000     |2020-10-01|21000.0      |
# |Monty|02 |45     |10000     |2015-05-05|11000.0      |
# |Roxy |03 |76     |30000     |2010-10-10|33000.0      |
#  ----- --- ------- ---------- ---------- ------------- 

To answer the original question...

You already have the function definition, so lambda is not needed. And overall, it's simpler to use the decorator @udf instead of the line
calAmount = udf(lambda interest,amount : calculateAmount(interest,amount),DecimalType())

The following works:

from pyspark.sql.functions import udf, col, array
from pyspark.sql.types import StringType, IntegerType, DecimalType
from datetime import date

df = spark.createDataFrame(
    [('John', '01', '89', 20000, '2020-10-01'),
     ('Monty', '02', '45', 10000, '2015-05-05'),
     ('Roxy', '03', '76', 30000, '2010-10-10')],
    ['name', 'id', 'loannum', 'loanamount', 'loandate'])

@udf
def calculateAmount(loandate, loanamount):
    y, m, d = loandate.split('-')[0], loandate.split('-')[1], loandate.split('-')[2]
    ld = date(int(y), int(m), int(d))
    if (date(2010, 1, 1) <= ld <= date(2015, 12, 31)):
        fine = 10
    elif (date(2016, 1, 1) <= ld <= date.today()):
        fine = 5
    return ((100   fine) * int(loanamount)) / 100

df = df.withColumn("NewLoanAmount", calculateAmount(col("loandate"), col("loanamount")))

df.show()
#  ----- --- ------- ---------- ---------- ------------- 
# |name |id |loannum|loanamount|loandate  |NewLoanAmount|
#  ----- --- ------- ---------- ---------- ------------- 
# |John |01 |89     |20000     |2020-10-01|21000.0      |
# |Monty|02 |45     |10000     |2015-05-05|11000.0      |
# |Roxy |03 |76     |30000     |2010-10-10|33000.0      |
#  ----- --- ------- ---------- ---------- ------------- 

A way without the @udf decorator:

calAmount = udf(calculateAmount)

or

calAmount = udf(calculateAmount, DoubleType())

In your original code, you provide DecimalType(), but Python does not do calculations in decimal numbers. Your calculations are done on double precision numbers, not decimals. So, your result is a double precision number, while Spark expects to get a decimal number... This is why you get nulls. To make the python function to return a decimal, you would need to use decimal library. Without it, it's just double precision numbers.

  • Related