Lets say I have a dataframe as following:
| id |col
| 1 | "A,B,C"
| 2 | "D,C"
| 3 | "B,C,A"
| 4 | None
and the dictionary is :
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}
the output dataframe must be :
| id |col
| 1 | "A"
| 2 | "C"
| 3 | "A"
| 4 | None
CodePudding user response:
Higher Order Functions - Transform
can be used to associate a rank the elements in col
based on the dictionary and then sorted to get the element with the lowest rank.
from pyspark.sql import functions as F
from itertools import chain
data = [(1, "A,B,C",),
(2, "D,C",),
(3, "B,C,A",),
(4, None,), ]
df = spark.createDataFrame(data, ("id", "col", ))
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}
mapper = F.create_map([F.lit(c) for c in chain.from_iterable(d.items())])
"""
Mapper has the value Column<'map(A, 1, B, 2, C, 3, D, 4)'>
"""
(df.withColumn("col", F.split(F.col("col"), ",")) # Split string to create an array
.withColumn("mapper", mapper) # Add mapping columing to the dataframe
.withColumn("col", F.expr("transform(col, x -> struct(mapper[x] as rank, x as col))")) # Iterate over array and look up rank from mapper
.withColumn("col", F.array_min(F.col("col")).col) # array_min find minimum value based on the first struct field
).select("id", "col").show()
"""
--- ----
| id| col|
--- ----
| 1| A|
| 2| C|
| 3| A|
| 4|null|
--- ----
"""
CodePudding user response:
Here's another solution with struct ordering as @Nithish answer but using arrays_zip
and array_min
instead:
- Create array of weights from the dict (ordered by the keys)
- Zip the array of weights with result of split
col
sorted - Get array min of the zipped array of structs
import pyspark.sql.functions as F
df = spark.createDataFrame([(1, "A,B,C"), (2, "D,C"), (3, "B,C,A"), (4, None)], ["id", "col"])
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}
result = df.withColumn(
"col",
F.array_min(
F.arrays_zip(
F.array(*[F.lit(d[x]) for x in sorted(d)]),
F.array_sort(F.split("col", ","))
)
)["1"]
)
result.show()
# --- ----
#| id| col|
# --- ----
#| 1| A|
#| 2| C|
#| 3| A|
#| 4|null|
# --- ----
CodePudding user response:
I assume you want to sort the letters according to the values given in the dictionary d
.
Then, you can do the following:
from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
spark = SparkSession.builder.master("local").appName("sort_column_test").getOrCreate()
df = spark.createDataFrame(data=(Row(1, "A,B,C",),
Row(2, "D,C",),
Row(3, "B,C,A",),
Row(4, None)),
schema="id:int, col:string")
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}
# Define a sort UDF that sorts the array according to the dictionary 'd', also handles None arrays
sort_udf = F.udf(lambda array: sorted(array,
key=lambda x: d[x]) if array is not None else None,
T.ArrayType(T.StringType()))
df = df.withColumn("col", sort_udf(F.split(F.col("col"), ",")).getItem(0))
df.show()
"""
--- ----
| id| col|
--- ----
| 1| A|
| 2| C|
| 3| A|
| 4|null|
--- ----
"""