Home > front end >  pyspark: Dataframe- UDF with multiple arguments
pyspark: Dataframe- UDF with multiple arguments

Time:06-22

I have a dataframe where column has an array and each element is a dictionary.

class product
{"deleteDate": null, "class":"AB", "validFrom": "2022-09-01", "validTo": "2009-08-31"}, {"deleteDate": null, "class":"CD", "validFrom": "2009-09-01", "validTo": "2024-08-31"} {"deleteDate": "2021-09-01", "class":"AB", "validFrom": "2003-09-01", "validTo": "2009-03-01"}, {"deleteDate": null, "class":"CD", "validFrom": "2009-09-01", "validTo": "2024-08-31"}

I am trying to filter an element base on a few conditions.

def getelement(value,entity):
    list_url = []
    for i in range(len(value)):
        if value[i] is not None and (value[i].deleteDate is None):
            if (value[i].validFrom <= (Date of Today)) & (value[i].validFrom >= (date of today)):

            list_url.append(value[i].entity)
    if list_url:
        return str(list_url[-1])
    if not list_url:
        return None

    
udfgeturl=F.udf(lambda z: getelement(z) if not z is None else "" , StringType() )



master = df.withColumn( 'ClassName', udfgeturl('Class')) 

The function takes two elements, value and entity. where value refers to column name and entity refers to a key in dictionary for which I want to save the result. The code works with one element getelement(value) for UDF but I do not know how the UDF can take two arguments, any suggestion, please?

CodePudding user response:

You can use a struct to bundle the parameters into 1 object. Then access the elements of the struct with . operator.

Code example:

def getelement(object):
  value = object.value
  entity = object.entity
  return str( entity   " "   value )

udfgeturl=f.udf(getelement , StringType() )
df.select( 
 udfgeturl( 
  f.struct( 
   f.col("col1").alias("value"), 
   f.col("col2").alias("entity")) 
  ) 
 ).show()

CodePudding user response:

To improve the performance (Spark functions vs UDF performance?), you could use only spark transformations:
I'm assuming (value[i].validFrom >= (date of today)) is supposed to actually be (value[i].validTo >= (date of today))

import pyspark.sql.functions as f

def getelement(value, entity):

    df = (
        df
        .withColumn('output', f.expr(f'filter({value}, element -> (element.deleteDate is null) & (element.validFrom <= current_date()) & (element.validTo >= current_date()))')[entity][-1])
    )

    return df
  • Related