Assume that I have a Spark DataFrame as below:
data = [("A", "A", 1), \
("A", "A", 2), \
("A", "A", 3), \
("A", "B", 4), \
("A", "B", 5), \
("A", "C", 6), \
("A", "D", 7), \
("A", "E", None), \
]
columns= ["col_1", "col_2", "col_3"]
df = spark.createDataFrame(data = data, schema = columns)
I want to get a list of unique entries for every column and save the results in a DataFrame. The output will be:
Column_Name | Unique_Values |
---|---|
col_1 | ['A'] |
col_2 | ['A', 'B', 'C', 'D', 'E'] |
col_3 | [1, 2, 3, 4, 5, 6, 7, Null] |
Any idea how to do that?
CodePudding user response:
One way to achieve is:
- Introduce a dummy column with some const value.
- Group by this dummy column and collect all columns as set.
- Since the result is "summary" data, it wont hurt to convert to Pandas. The dataframe transpose is little too much in Spark < v3.4 where
melt()
function is not available. - Transpose the pandas dataframe.
sdf = sdf.withColumn("dummy", F.lit("1")) \
.groupBy("dummy") \
.agg(*[F.collect_set(c).alias(c) for c in sdf.columns]) \
.drop("dummy") \
[Out]:
----- --------------- ---------------------
|col_1|col_2 |col_3 |
----- --------------- ---------------------
|[A] |[C, E, B, A, D]|[1, 5, 2, 6, 3, 7, 4]|
----- --------------- ---------------------
pdf = sdf.toPandas() \
.T \
.reset_index() \
.rename(columns={0: "Unique_Values", "index": "Column_Name"})
[Out]:
Column_Name Unique_Values
0 col_1 [A]
1 col_2 [C, E, B, A, D]
2 col_3 [1, 5, 2, 6, 3, 7, 4]
As you can see, the None
or null
is not included. To include it, you need to do some extra processing: convert columns to type string. If you want to retain the original types, then you need to track each column and cast them appropriately.
for c in sdf.columns:
sdf = sdf.withColumn(c, F.col(c).cast("string")).na.fill("_NULL_")
and replace back:
pdf["Unique_Values"] = pdf["Unique_Values"].apply(lambda x: [None if v == "_NULL_" else v for v in x])
Full example:
data = [("A", "A", 1), \
("A", "A", 2), \
("A", "A", 3), \
("A", "B", 4), \
("A", "B", 5), \
("A", "C", 6), \
("A", "D", 7), \
("A", "E", None), \
]
columns= ["col_1", "col_2", "col_3"]
sdf = spark.createDataFrame(data = data, schema = columns)
for c in sdf.columns:
sdf = sdf.withColumn(c, F.col(c).cast("string")).na.fill("_NULL_")
sdf = sdf.withColumn("dummy", F.lit("1")) \
.groupBy("dummy") \
.agg(*[F.collect_set(c).alias(c) for c in sdf.columns]) \
.drop("dummy") \
pdf = sdf.toPandas() \
.T \
.reset_index() \
.rename(columns={0: "Unique_Values", "index": "Column_Name"})
pdf["Unique_Values"] = pdf["Unique_Values"].apply(lambda x: [None if v == "_NULL_" else v for v in x])
[Out]:
Column_Name Unique_Values
0 col_1 [A]
1 col_2 [C, E, B, A, D]
2 col_3 [3, None, 1, 2, 5, 4, 7, 6]