Home > other >  Pandas UDF with dictionary lookup and conditionals
Pandas UDF with dictionary lookup and conditionals

Time:09-20

I want to use pandas_udf in Pyspark for certain transformations and calculations of column. And it seems that pandas udf can't be written exactly as normal UDFs.

An example function looks something like below:

def modify_some_column(example_column_1, example_column_2):

    lookup_dict = {'a' : 1, 'b' : 2, 'c' : 3,'d': 4, 'e' : 5} #can be anything

    if example_column_1 in lookup_dict:
        if(example_column_1 == 'a' and example_column_2 == "something"):
            return lookup_dict[example_column_1]
        
        elif(example_column_1 == 'a' and example_column_2 == "something else"):
        
            return "something else"
        else:
            return lookup_dict[example_column_1]    
    else:
        return ""

Basically, takes in two column values from a spark dataframe and returns a value which I intend to use with withColumn:

modify_some_column_udf  = pandas_udf(modify_some_column, returnType= StringType())
df = df.withColumn('new_col',modify_property_type_udf(df.col_1,df.col_2))

But this does not work. How should I modify the above to be able to use it in pandas udf?

Edit: It is clear to me that the above conditions can be easily and efficiently be implemented using native PySpark functions. But I am looking to write the above logic using Pandas UDF.

CodePudding user response:

With this simple if/else logic, you don't have to use UDF. In fact you should avoid to use UDFs as much as possible.

Assuming you have the dataframe as follow

df = spark.createDataFrame([
    ('a', 'something'),
    ('a', 'something else'),
    ('c', None),
    ('c', ''),
    ('c', 'something'),
    ('c', 'something else'),
    ('c', 'blah'),
    ('f', 'blah'),
], ['c1', 'c2'])
df.show()

 --- -------------- 
| c1|            c2|
 --- -------------- 
|  a|     something|
|  a|something else|
|  c|          null|
|  c|              |
|  c|     something|
|  c|something else|
|  c|          blah|
|  f|          blah|
 --- -------------- 

You can create a temporary lookup column and use it to check against other columns

import json
your_lookup_dict = {'a' : 1, 'b' : 2, 'c' : 3,'d': 4, 'e' : 5}

import pyspark.sql.functions as F

(df
    .withColumn('lookup', F.from_json(F.lit(json.dumps(your_lookup_dict)), 'map<string, string>'))
    .withColumn('mod', F
        .when((F.col('c1') == 'a') & (F.col('c2') == 'something'), F.col('lookup')[F.col('c1')])
        .when((F.col('c1') == 'a') & (F.col('c2') == 'something else'), F.lit('something else'))
        .otherwise(F.col('lookup')[F.col('c1')])
    )
    .show(10, False)
)

 --- -------------- ---------------------------------------- -------------- 
|c1 |c2            |lookup                                  |mod           |
 --- -------------- ---------------------------------------- -------------- 
|a  |something     |{a -> 1, b -> 2, c -> 3, d -> 4, e -> 5}|1             |
|a  |something else|{a -> 1, b -> 2, c -> 3, d -> 4, e -> 5}|something else|
|c  |null          |{a -> 1, b -> 2, c -> 3, d -> 4, e -> 5}|3             |
|c  |              |{a -> 1, b -> 2, c -> 3, d -> 4, e -> 5}|3             |
|c  |something     |{a -> 1, b -> 2, c -> 3, d -> 4, e -> 5}|3             |
|c  |something else|{a -> 1, b -> 2, c -> 3, d -> 4, e -> 5}|3             |
|c  |blah          |{a -> 1, b -> 2, c -> 3, d -> 4, e -> 5}|3             |
|f  |blah          |{a -> 1, b -> 2, c -> 3, d -> 4, e -> 5}|null          |
 --- -------------- ---------------------------------------- -------------- 

EDIT

Since you insisted to use Pandas UDF, you'd have to understand that Pandas execute your dataframe by batches, so you'll have to wrap your functions to something like this

def wrapper(iterator):
    def modify_some_column(example_column_1, example_column_2):
        lookup_dict = {'a' : 1, 'b' : 2, 'c' : 3,'d': 4, 'e' : 5} #can be anything
        if example_column_1 in lookup_dict:
            if(example_column_1 == 'a' and example_column_2 == "something"):
                return str(lookup_dict[example_column_1])
            elif(example_column_1 == 'a' and example_column_2 == "something else"):
                return "something else"
            else:
                return str(lookup_dict[example_column_1])    
        else:
            return ""

    for pdf in iterator:
        pdf['mod'] = pdf.apply(lambda r: modify_some_column(r['c1'], r['c2']), axis=1)
        yield pdf

df = df.withColumn('mod', F.lit('temp'))
df.mapInPandas(wrapper, df.schema).show()

 --- -------------- -------------- 
| c1|            c2|           mod|
 --- -------------- -------------- 
|  a|     something|             1|
|  a|something else|something else|
|  c|          null|             3|
|  c|              |             3|
|  c|     something|             3|
|  c|something else|             3|
|  c|          blah|             3|
|  f|          blah|              |
 --- -------------- -------------- 
  • Related