I would like to test if a value in a column exists in a regular python dict, or pyspark map in a when().otherwise() code block but cannot figure out the correct syntax. There will be multiple when() clauses using permutations of the "Count" column so need something similar to "if/elif/else". Dictionary/map will be large and will not be a column in the dataframe
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F
from pyspark.sql.types import *
from itertools import chain
spark = (SparkSession
.builder
.getOrCreate())
data = [('Category A', 100, "This is category A"),
('Category B', 120, "This is category B"),
('Category C', 150, None)]
schema = StructType([
StructField('Category', StringType(), True),
StructField('Count', IntegerType(), True),
StructField('Description', StringType(), True)
])
rdd = spark.sparkContext.parallelize(data)
df = spark.createDataFrame(rdd, schema)
### Can either match regular python dict, or pyspark map ###
sec_lookup = {120: "new_category"}
sec_lookup_map = F.create_map(*[F.lit(x) for x in chain(*sec_lookup.items())])
user_df = df.withColumn(
"new_col",
F.when(
df["Count"].value in sec_lookup.keys(), <--- WHAT IS CORRECT SYNTAX?
F.concat(F.col("Category"), F.lit("_add"))
).when(
...
...
)
.otherwise(
F.concat(F.col("Category"), F.lit("_old"))
)
)
CodePudding user response:
With isin
I believe you wouldn't need an if/elif/else rather a if/else should be fine since you just check the membership on the dictionary keys:
sec_lookup = {120: "new_category"}
df = df.withColumn("new",F.when(F.col("Count").isin([*sec_lookup.keys()])
,F.concat("Category",F.lit("_new"))).otherwise(
F.concat("Category",F.lit("_old"))))
df.show()
---------- ----- ------------------ --------------
| Category|Count| Description| new|
---------- ----- ------------------ --------------
|Category A| 100|This is category A|Category A_old|
|Category B| 120|This is category B|Category B_new|
|Category C| 150| null|Category C_old|
---------- ----- ------------------ --------------