Home > Enterprise >  How to get unique values of every column in PySpark DataFrame and save the results in a DataFrame?
How to get unique values of every column in PySpark DataFrame and save the results in a DataFrame?

Time:12-06

Assume that I have a Spark DataFrame as below:

data = [("A", "A", 1), \
    ("A", "A", 2), \
    ("A", "A", 3), \
    ("A", "B", 4), \
    ("A", "B", 5), \
    ("A", "C", 6), \
    ("A", "D", 7), \
    ("A", "E", None), \
    ]

columns= ["col_1", "col_2", "col_3"]
df = spark.createDataFrame(data = data, schema = columns)

enter image description here

I want to get a list of unique entries for every column and save the results in a DataFrame. The output will be:

Column_Name Unique_Values
col_1 ['A']
col_2 ['A', 'B', 'C', 'D', 'E']
col_3 [1, 2, 3, 4, 5, 6, 7, Null]

Any idea how to do that?

CodePudding user response:

One way to achieve is:

  • Introduce a dummy column with some const value.
  • Group by this dummy column and collect all columns as set.
  • Since the result is "summary" data, it wont hurt to convert to Pandas. The dataframe transpose is little too much in Spark < v3.4 where melt() function is not available.
  • Transpose the pandas dataframe.
sdf = sdf.withColumn("dummy", F.lit("1")) \
         .groupBy("dummy") \
         .agg(*[F.collect_set(c).alias(c) for c in sdf.columns]) \
         .drop("dummy") \

[Out]:
 ----- --------------- --------------------- 
|col_1|col_2          |col_3                |
 ----- --------------- --------------------- 
|[A]  |[C, E, B, A, D]|[1, 5, 2, 6, 3, 7, 4]|
 ----- --------------- --------------------- 


pdf = sdf.toPandas() \
         .T \
         .reset_index() \
         .rename(columns={0: "Unique_Values", "index": "Column_Name"})
[Out]:
  Column_Name          Unique_Values
0       col_1                    [A]
1       col_2        [C, E, B, A, D]
2       col_3  [1, 5, 2, 6, 3, 7, 4]

As you can see, the None or null is not included. To include it, you need to do some extra processing: convert columns to type string. If you want to retain the original types, then you need to track each column and cast them appropriately.

for c in sdf.columns:
  sdf = sdf.withColumn(c, F.col(c).cast("string")).na.fill("_NULL_")

and replace back:

pdf["Unique_Values"] = pdf["Unique_Values"].apply(lambda x: [None if v == "_NULL_" else v for v in x])

Full example:

data = [("A", "A", 1), \
    ("A", "A", 2), \
    ("A", "A", 3), \
    ("A", "B", 4), \
    ("A", "B", 5), \
    ("A", "C", 6), \
    ("A", "D", 7), \
    ("A", "E", None), \
  ]

columns= ["col_1", "col_2", "col_3"]
sdf = spark.createDataFrame(data = data, schema = columns)

for c in sdf.columns:
  sdf = sdf.withColumn(c, F.col(c).cast("string")).na.fill("_NULL_")

sdf = sdf.withColumn("dummy", F.lit("1")) \
         .groupBy("dummy") \
         .agg(*[F.collect_set(c).alias(c) for c in sdf.columns]) \
         .drop("dummy") \

pdf = sdf.toPandas() \
         .T \
         .reset_index() \
         .rename(columns={0: "Unique_Values", "index": "Column_Name"})

pdf["Unique_Values"] = pdf["Unique_Values"].apply(lambda x: [None if v == "_NULL_" else v for v in x])

[Out]:
  Column_Name                Unique_Values
0       col_1                          [A]
1       col_2              [C, E, B, A, D]
2       col_3  [3, None, 1, 2, 5, 4, 7, 6]
  • Related