Home > Back-end >  Find columns that are exact duplicates (i.e., that contain duplicate values across all rows) in PySp
Find columns that are exact duplicates (i.e., that contain duplicate values across all rows) in PySp

Time:10-01

I have a PySpark dataframe with ~4800 columns and am trying to find a way to identify and drop columns that have different column names but that are otherwise exact duplicates of one another.

For example, in the following dataframe, I would want to drop columns C and E (since they are duplicates of column A) and also know that columns C and E were the ones that were dropped.

 --- --- --- --- --- --- 
|  A|  B|  C|  D|  E|  F|
 --- --- --- --- --- --- 
|  1|  2|  1|  3|  1|  2|
 --- --- --- --- --- ---  
|  1|  1|  1|  2|  1|  2|
 --- --- --- --- --- ---  
|  1|  3|  1|  1|  1|  2|
 --- --- --- --- --- ---  

I see this post with a potential solution - but it runs very slowly. I'm not sure if there's a way to optimize this to run more quickly on a larger dataframe?

CodePudding user response:

A relatively fast way would be to calculate simple hashes of all columns to compare (at a cost of slight risk of collision).

Scala:

// Grab a list of columns
val cols = df.columns
// >>> Array(A, B, C, D, E, F)

// Calculate column hashes
// Include a unique id, so that same values in different order get a different hash
val hashes = df.withColumn("id", monotonically_increasing_id)
  .select(cols.map(c => hash($"id" * col(c)).as(c)):_*)
  .agg(sum(lit(0)).as("dummy"), df.columns.map(c => sum(col(c)).as(c)):_*)
  .drop("dummy")
// >>>  ---------- ----------- ---------- ----------- ---------- ----------- 
// >>> |         A|          B|         C|          D|         E|          F|
// >>>  ---------- ----------- ---------- ----------- ---------- ----------- 
// >>> |-515933930|-1948328522|-515933930|-2768907968|-515933930|-2362158726|
// >>>  ---------- ----------- ---------- ----------- ---------- ----------- 

// Group column names by their hash value
val groups = (hashes.columns zip hashes.head.toSeq).groupBy(_._2).mapValues(_.map(_._1))
// >>> Map(-515933930 -> Array(A, C, E), -1948328522 -> Array(B), -2768907968 -> Array(D), -2362158726 -> Array(F))

// Pick one column for each hash value and discard rest
val columnsToKeep = groups.values.map(_.head)
// >>> List(A, B, D, F)
val columnsToDrop = groups.values.flatMap(_.tail)
// >>> List(C, E)
val finalDf = df.select(columnsToKeep.toSeq.map(col):_*)
// >>>  --- --- --- --- 
// >>> |  A|  B|  D|  F|
// >>>  --- --- --- --- 
// >>> |  1|  2|  3|  2|
// >>> |  1|  1|  2|  2|
// >>> |  1|  3|  1|  2|
// >>>  --- --- --- --- 

Python:

from pyspark.sql import functions as F

df_hashes = (df
    .withColumn("_id", F.monotonically_increasing_id())
    .agg(*[F.sum(F.hash("_id", c)).alias(c) for c in df.columns[::-1]])
)
keep = dict(zip(df_hashes.head(), df_hashes.columns)).values()

cols_to_keep = [c for c in df.columns if c in keep]
# ['A', 'B', 'D', 'F']

cols_to_drop = set(df.columns) - set(keep)
# {'C', 'E'}

df_final = df.select([c for c in cols_to_keep]).show()
#  --- --- --- --- 
# |  A|  B|  D|  F|
#  --- --- --- --- 
# |  1|  2|  3|  2|
# |  1|  1|  2|  2|
# |  1|  3|  1|  2|
#  --- --- --- --- 

CodePudding user response:

I've rewritten Kombajans solution into pyspark, very cool idea!

#setup

from datetime import datetime
from pyspark.sql.functions import col, hash, sum, monotonically_increasing_id, lit

now_timestamp = datetime.now()
now_timestamp2 = datetime.now()

# columns a-e are the same as a2-e2. a3-e3 are different.
df = spark.createDataFrame(
    data=[
        (2,'b', True, now_timestamp, 0.001, 2,'b', True, now_timestamp, 0.001, 3,'c', True, now_timestamp2, None),
        (2,'b', True, now_timestamp, 0.001, 2,'b', True, now_timestamp, 0.001, 3,'c', True, now_timestamp2, None),
        (2,'c', True, now_timestamp, 0.001, 2,'c', True, now_timestamp, 0.001, 3,'c', False, now_timestamp, 0.001),
        (None, None, None, None, None, None, None, None, None, None, None, None, None, None, 0.001),
        (None, None, None, None, None, None, None, None, None, None, None, None, None, None, 0.001),
    ],
    schema="""a integer, b string, c boolean, d timestamp, e float,
            a2 integer, b2 string, c2 boolean, d2 timestamp, e2 float,
            a3 integer, b3 string, c3 boolean, d3 timestamp, e3 float"""
)

# Hash monotonically_increasing_id logic
col_hashes = df \
.withColumn("id", monotonically_increasing_id()) \
.select(
    *[hash('id', c).alias(c) for c in df.columns]   
) \
.agg(
   *[sum(c).alias(c) for c in df.columns] 
)

# move to pandas (should be a small df so you can work locally)
# transpose and remove duplicate hashes
p = col_hashes.toPandas().transpose().reset_index()
p.columns=['column', 'hash']

columns_to_keep = p.drop_duplicates(['hash'])['column'].tolist()

df.select(columns_to_keep).show()

 ---- ---- ---- -------------------- ----- ---- ---- ----- -------------------- ----- 
|   a|   b|   c|                   d|    e|  a3|  b3|   c3|                  d3|   e3|
 ---- ---- ---- -------------------- ----- ---- ---- ----- -------------------- ----- 
|   2|   b|true|2022-09-30 08:38:...|0.001|   3|   c| true|2022-09-30 08:38:...|0.002|
|   2|   b|true|2022-09-30 08:38:...|0.001|   3|   c| true|2022-09-30 08:38:...|0.002|
|   2|   c|true|2022-09-30 08:38:...|0.001|   3|   c|false|2022-09-30 08:38:...|0.002|
|null|null|null|                null| null|null|null| null|                null| null|
|null|null|null|                null| null|null|null| null|                null| null|
 ---- ---- ---- -------------------- ----- ---- ---- ----- -------------------- ----- 


CodePudding user response:

I am using Pandas to transpose and dropping duplicates. Using toPandas, converting spark dataframe to pandas dataframe. For 315 columns and 100K records, it took 30 secs. Not sure, how it will behave for millions of records.

import pandas as pd

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

pandasDF = df.toPandas()
pandasDFT= pandasDF.T.drop_duplicates()
print(pandasDFT.T)
  • Related