Home > Enterprise >  Pyspark udf for populating a column based on two other columns
Pyspark udf for populating a column based on two other columns

Time:11-18

I have the following issue since I am a bit of a noob in pyspark. Based on conditions on values coming from two columns I want to populate a third column. The conditions are:

  • if semicolon contained in col2, check col1
    • if col1 == 1, take the value before the semicolon
    • if col1 == 2, take the value after the semicolon
  • if no semicolon in col1, take the value from col2 as-is

This is what the dataframe looks like.

col1 col2 col3
1 24.9;34.9 24.9
2 24.9;34.9 34.9
1 80.8;90.9 80.8
2 80.8;90.9 90.9
1 777 777

I made the following udf which gives the error Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

import pyspark.sql.functions as F

def split_by_semicolon_if_exists(col1,col2):
    if (col1.contains(';') == True):
        if col2 == 1:
          result = F.substring(col1, 0, (F.length(col1) - F.expr('locate('';'', col1) - 1')))
        if col2 == 2:
          result = F.substring(col1, F.expr('locate('';'', col1) - 1'), (F.length(col1) - F.expr('locate('';'', col1) - 1')))     
        return result
    else:
      return col1

df = df.withColumn('col3', 
                     split_by_semicolon_if_exists(df['col1'], 
                                                  df['col2']))

I have built this udf by googling for the various functions so there probably are multiple issues with it. Can you please help me build a udf for this case?

CodePudding user response:

Take a look at split function.

Using UDF:

spark = SparkSession.builder.getOrCreate()
data = [
    {"col1": 1, "col2": "24.9;34.9"},
    {"col1": 2, "col2": "24.9;34.9"},
    {"col1": 1, "col2": "80.8;90.9"},
    {"col1": 1, "col2": "777"},
]
df = spark.createDataFrame(data)


def get_value(item, value):
    if ";" in value:
        return value.split(";")[item - 1]
    return value


df = df.withColumn("col3", F.udf(get_value, StringType())(F.col("col1"), F.col("col2")))

Without UDF:

df = df.withColumn(
    "col3",
    F.when(
        F.col("col2").contains(";"), F.split("col2", ";").getItem(F.col("col1") - 1)
    ).otherwise(F.col("col2")),
)

Result:

root
 |-- col1: long (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: string (nullable = true)

 ---- --------- ----                                                            
|col1|col2     |col3|
 ---- --------- ---- 
|1   |24.9;34.9|24.9|
|2   |24.9;34.9|34.9|
|1   |80.8;90.9|80.8|
|1   |777      |777 |
 ---- --------- ---- 

CodePudding user response:

You can use this code:

import pyspark.sql.functions as F
import pyspark.sql.types as T

df =spark.createDataFrame(
        data = [(1, "24.9;34.9"),
                (2,"24.9;34.9"),
                (1,"80.8;90.9"),
                (2,"80.8;90.9"),
                (1,"777")],
        schema=["col1","col2"])
df.show()
def split_by_semicolon_if_exists(col1,col2):
    if  ';' in col2 :
        if col1 == 1:
            result = col2.split(';')[0]
        if col1 == 2:
            result = col2.split(';')[1]
        return result
    else:
        return col2

split_by_semicolon_if_exists_udf  =F.udf(split_by_semicolon_if_exists , T.StringType())
    
df = df.withColumn('col3', split_by_semicolon_if_exists_udf(df['col1'], df['col2']))
df.show()

To use functions over a column of your dataframe you have to declare them as udf funcions with F.udf(function, return argument type)

You can check this documentation https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/

Also, python has easier functions to manage strings such as

if 'string' in stringVariable :

(True/ False if substring is present in main string)

You can also divide strings in a certain character using

string.split(';')

(Returns an array of the sepparated parts)

CodePudding user response:

You can use expr without having to use a udf here. Since Python indexing starts from 0, you need to subtract 1 from your col1:

from pyspark.sql import functions as F

df.withColumn("Result",F.expr("""split(col2,';')[int(col1)-1]""")).show()

 ---- --------- ----- ------ 
|col1|     col2| col3|Result|
 ---- --------- ----- ------ 
|   1|24.9;34.9| 24.9|  24.9|
|   2|24.9;34.9| 34.9|  34.9|
|   1|80.8;90.9| 80.8|  80.8|
|   2|80.8;90.9| 90.9|  90.9|
|   1|      777|777.0|   777|
 ---- --------- ----- ------ 

The new column Result is same as your output in col3

  • Related