Home > Software engineering >  How to rank the column values based on a dictionary and retain the highest value?
How to rank the column values based on a dictionary and retain the highest value?

Time:12-30

Lets say I have a dataframe as following:

| id |col
| 1  | "A,B,C"
| 2  | "D,C"
| 3  | "B,C,A"
| 4  | None

and the dictionary is :

d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}

the output dataframe must be :

| id |col
| 1  | "A"
| 2  | "C"
| 3  | "A"
| 4  | None

CodePudding user response:

Higher Order Functions - Transform can be used to associate a rank the elements in col based on the dictionary and then sorted to get the element with the lowest rank.

from pyspark.sql import functions as F
from itertools import chain

data = [(1, "A,B,C",),
        (2, "D,C",),
        (3, "B,C,A",),
        (4, None,), ]
df = spark.createDataFrame(data, ("id", "col", ))

d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}

mapper = F.create_map([F.lit(c) for c in chain.from_iterable(d.items())])

"""
Mapper has the value Column<'map(A, 1, B, 2, C, 3, D, 4)'>
"""

(df.withColumn("col", F.split(F.col("col"), ",")) # Split string to create an array
  .withColumn("mapper", mapper) # Add mapping columing to the dataframe
  .withColumn("col", F.expr("transform(col, x -> struct(mapper[x] as rank, x as col))")) # Iterate over array and look up rank from mapper
  .withColumn("col", F.array_min(F.col("col")).col) # array_min find minimum value based on the first struct field
).select("id", "col").show()

"""
 --- ---- 
| id| col|
 --- ---- 
|  1|   A|
|  2|   C|
|  3|   A|
|  4|null|
 --- ---- 
"""

CodePudding user response:

Here's another solution with struct ordering as @Nithish answer but using arrays_zip and array_min instead:

  1. Create array of weights from the dict (ordered by the keys)
  2. Zip the array of weights with result of split col sorted
  3. Get array min of the zipped array of structs
import pyspark.sql.functions as F

df = spark.createDataFrame([(1, "A,B,C"), (2, "D,C"), (3, "B,C,A"), (4, None)], ["id", "col"])
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}

result = df.withColumn(
    "col",
    F.array_min(
        F.arrays_zip(
            F.array(*[F.lit(d[x]) for x in sorted(d)]), 
            F.array_sort(F.split("col", ","))
        )
    )["1"]
)

result.show()
# --- ---- 
#| id| col|
# --- ---- 
#|  1|   A|
#|  2|   C|
#|  3|   A|
#|  4|null|
# --- ---- 

CodePudding user response:

I assume you want to sort the letters according to the values given in the dictionary d.

Then, you can do the following:

from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.master("local").appName("sort_column_test").getOrCreate()

df = spark.createDataFrame(data=(Row(1, "A,B,C",),
                                 Row(2, "D,C",),
                                 Row(3, "B,C,A",),
                                 Row(4, None)),
                           schema="id:int, col:string")
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}

# Define a sort UDF that sorts the array according to the dictionary 'd', also handles None arrays
sort_udf = F.udf(lambda array: sorted(array,
                                      key=lambda x: d[x]) if array is not None else None,
                 T.ArrayType(T.StringType()))
df = df.withColumn("col", sort_udf(F.split(F.col("col"), ",")).getItem(0))
df.show()

"""
 --- ---- 
| id| col|
 --- ---- 
|  1|   A|
|  2|   C|
|  3|   A|
|  4|null|
 --- ---- 
"""





  • Related