Home > Mobile >  How to F.when based on variable number of a conditions to supply with pyspark
How to F.when based on variable number of a conditions to supply with pyspark

Time:12-15

I'm trying to build a series of F.when based on a variable number of conditions. How can I build the logic below using a loop where I supply a list of items to test (i.e. [1,2,3] following the example below)?

The reason I ask is because I want to be able to build these conditions with a variable number of test items in the list.. the loop logic should build something like the below, but by passing a list of numbers to test, [1,2,3].

F.when(F.col("test") == 1, "out_"   str(1) ).when(F.col("test") == 2, "out_"   str(2)).when(F.col("test") == 3, "out_"   str(3)).otherwise(-1)

I've tried to use reduce to do this, but haven't figure this out before. Does anyone have any advice?

reduce(lambda x, i: x.when(F.col("test") == i , "out_"   str(i)),  
              output_df, 
              F).otherwise(-1)

My expected output should provide the same logic as the below:

Column<b'CASE WHEN (test = 1) THEN out_1 WHEN (test = 2) THEN out_2 WHEN (test = 3) THEN out_3 ELSE -1 END'>

CodePudding user response:

You almost got it, you need to pass the list of test cases as the second parameter to the reduce function:

from functools import reduce
import pyspark.sql.functions as F


tests = [1, 2, 3]

new_col = reduce(
    lambda x, i: x.when(F.col("test") == i, "out_"   str(i)),
    tests,
    F
).otherwise(-1)

print(new_col)

#Column<'CASE WHEN (test = 1) THEN out_1 WHEN (test = 2) THEN out_2 WHEN (test = 3) THEN out_3 ELSE -1 END'>

CodePudding user response:

Since your check has the same value as your output but with appended out_. You could check if the value is in the predefined list, and if it is just add out_.

Example:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F


data = [
    {"test": 1},
    {"test": 2},
    {"test": 3},
    {"test": 4},
    {"test": 5},
]

test_ints = [1, 2, 3]

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(data)
df = df.withColumn(
    "result",
    F.when(
        F.col("test").isin(test_ints),
        F.concat(F.lit("out_"), F.col("test")),
    ).otherwise(-1),
)

Result:

 ---- ------                                                                    
|test|result|
 ---- ------ 
|1   |out_1 |
|2   |out_2 |
|3   |out_3 |
|4   |-1    |
|5   |-1    |
 ---- ------ 
  • Related