Home > Mobile >  Pyspark check if value in dictionary or map using when() otherwise()
Pyspark check if value in dictionary or map using when() otherwise()

Time:04-22

I would like to test if a value in a column exists in a regular python dict, or pyspark map in a when().otherwise() code block but cannot figure out the correct syntax. There will be multiple when() clauses using permutations of the "Count" column so need something similar to "if/elif/else". Dictionary/map will be large and will not be a column in the dataframe

from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F
from pyspark.sql.types import *
from itertools import chain

spark = (SparkSession
             .builder
             .getOrCreate())

    data = [('Category A', 100, "This is category A"),
            ('Category B', 120, "This is category B"),
            ('Category C', 150, None)]

    schema = StructType([
        StructField('Category', StringType(), True),
        StructField('Count', IntegerType(), True),
        StructField('Description', StringType(), True)
    ])

    rdd = spark.sparkContext.parallelize(data)
    df = spark.createDataFrame(rdd, schema)

    ### Can either match regular python dict, or pyspark map ###
    sec_lookup = {120: "new_category"}
    sec_lookup_map = F.create_map(*[F.lit(x) for x in chain(*sec_lookup.items())])

    user_df = df.withColumn(
        "new_col",
        F.when(
            df["Count"].value in sec_lookup.keys(),  <--- WHAT IS CORRECT SYNTAX?
            F.concat(F.col("Category"), F.lit("_add"))
        ).when(
            ...
            ...
        )
        .otherwise(
            F.concat(F.col("Category"), F.lit("_old"))
        )
    )

CodePudding user response:

With isin I believe you wouldn't need an if/elif/else rather a if/else should be fine since you just check the membership on the dictionary keys:

sec_lookup = {120: "new_category"}

df = df.withColumn("new",F.when(F.col("Count").isin([*sec_lookup.keys()])
                           ,F.concat("Category",F.lit("_new"))).otherwise(
                            F.concat("Category",F.lit("_old"))))

df.show()
 ---------- ----- ------------------ -------------- 
|  Category|Count|       Description|           new|
 ---------- ----- ------------------ -------------- 
|Category A|  100|This is category A|Category A_old|
|Category B|  120|This is category B|Category B_new|
|Category C|  150|              null|Category C_old|
 ---------- ----- ------------------ -------------- 
  • Related