Home > Software engineering >  Query withColumn Pyspark to add a column dataframe based on array
Query withColumn Pyspark to add a column dataframe based on array

Time:02-10

I have a dataframe:

   "people"      "other"
   father         ...
   mother
   cat
   brother
   dog

I'm going to insert new column, if column people contains word in specific array then modify the content with that of other array, else word will remain the same:

array=['father','mother','brother']
array_new=['dad','mum','bro']

   "people"      "other"     "new"
   father         ...        dad
   mother                    mum
   cat                       cat
   brother                   bro
   dog                       dog

I think to use this:

expression = ("CASE "  "".join(["WHEN people LIKE '{}' THEN '{}' ".format(val,array_new[array.index(val)](val)) for val in array])  "ELSE 'None' END")

df_pyspark = df_pyspark.withColumn("new", functions.expr(expression))

I should change the else condition, but I don't know how to do it, for copy the same word in the else condition.

CodePudding user response:

You use a literal map expression that you create from array and array_new using create_map function:

from pyspark.sql import functions as F
from itertools import chain

mapping = F.create_map(*[F.lit(x) for x in chain(*zip(array, array_new))])

df1 = df.withColumn("new", F.coalesce(mapping[F.col("people")], F.col("people")))

Or using na.replace by passing a dict that you create from the two arrays like this:

from pyspark.sql import functions as F

mapping_dict = dict(zip(array, array_new))

df1 = df.withColumn("new", F.col("people")).na.replace(mapping_dict, subset=["new"])

Another way by chaining multiple when expressions:

from functools import reduce
from pyspark.sql import functions as F

when_expr = reduce(
    lambda acc, x: acc.when(F.col("people") == x[0], x[1]),
    zip(array, array_new),
    F
).otherwise(F.col("people"))

df1 = df.withColumn("new", when_expr)
  • Related