Home > Software design >  Streamline when/otherwise logic using a udf - Pyspark
Streamline when/otherwise logic using a udf - Pyspark

Time:11-10

I need to create a single column based on other columns with following logic: zyx>abc>pep>none. I need to do that for several subsets of columns thats why the usual when/otherwise would be ineffective. Can this be automated with a udf perhaps where I pass a list of column names?

Here is an example of a subset of columns and expected outcome in col5:

  col1     col2      col3      col4    (...)     col5
  zyx      pep       abc       pep     (...)     zyx
  pep      pep       abc       pep     (...)     abc
  abc      pep       pep       pep     (...)     abc
  abc      pep       pep       pep     (...)     abc
  pep      pep       pep       pep     (...)     pep
  pep      pep       pep       zyx     (...)     zyx

CodePudding user response:

This is one way to do it and I don't use udf as udf tends to be slow.

from itertools import chain

sort_mapping = {
    'zyx': 1,
    'abc': 2,
    'pep': 3    
}

reverse_mapping = {value: key for key, value in sort_mapping.items()}

# Make map objects to be referenced from the Pyspark functions.
sort_mapping = F.create_map([F.lit(x) for x in chain(*sort_mapping.items())])
reverse_mapping = F.create_map([F.lit(x) for x in chain(*reverse_mapping.items())])

col_list = ['col1', 'col2', 'col3', 'col4']
df = (df.withColumn('arr', F.array(col_list))
      .withColumn('arr', F.array_min(F.transform('arr', lambda x: sort_mapping[x])))
      .withColumn('col5', reverse_mapping[F.col('arr')]))

Explanation:

First, collect all columns in concern into an array.

.withColumn('arr', F.array(col_list))

Then, transform the array with the sorting order value. (ie: zyx -> 1, abc -> 2)

F.transform('arr', lambda x: mapping_expr[x]))

And take a minimum of the array with array_min.

Finally, map the value back to the original string. (ie: 1 -> zyc, 2 -> abc)

.withColumn('col5', reverse_mapping[F.col('arr')])

Update

For <Pyspark3.1,

df = (df.withColumn('arr', F.array('col1', 'col2', 'col3', 'col4'))
      .withColumn('sort_mapping', sort_mapping)
      .withColumn('arr', F.array_min(F.expr(f'transform(arr, x -> sort_mapping[x])')))
      .withColumn('col5', reverse_mapping[F.col('arr')]))

I added the sort_mapping into a column to be accessible from raw expr and use raw SQL's transform.

  • Related