Home > Blockchain >  Spark add duplicate only when one column is same and other is different
Spark add duplicate only when one column is same and other is different

Time:08-20

I have data like this

[
{"uuid":"fdkhflds","key": "A", "id": "1"},
{"uuid":"ieuieiue","key": "A", "id": "2"},
{"uuid":"qwtriqrr","key": "A", "id": "3"},
{"uuid":"dhgfsddd","key": "A", "id": "1"},
{"uuid":"sdjhfdjh","key": "E", "id": "4"}
]

I want to add flag in those column where key is same but id is different.

Expected output:

[
{"uuid":"fdkhflds","key": "A", "id": "1","de_dupe_required": 0},
{"uuid":"ieuieiue","key": "A", "id": "2","de_dupe_required": 1},
{"uuid":"qwtriqrr","key": "A", "id": "3","de_dupe_required": 1},
{"uuid":"dhgfsddd","key": "A", "id": "1","de_dupe_required": 0},
{"uuid":"sdjhfdjh","key": "E", "id": "4","de_dupe_required": 0}
]

Explanation:

  1. Since first and fourth record have same key and id, So no flag is needed
  2. Since fifth record has no same key or id, So no flag for this as well
  3. Since second and third have the same key, but id is different so flag should be 1

CodePudding user response:

You could achieve this with pyspark.sql.Window by generating a rank() for the keys ordered by the id. Then marking as de_dupe_required wherever the rank() is not 1.

from pyspark.sql import functions as F, Window

window_spec = Window.partitionBy("key").orderBy("id")

df = (df.withColumn("dupe_rank", F.rank().over(window_spec))
      .withColumn("de_dupe_required", F.when(F.col("dupe_rank")==1, F.lit(0))
                  .otherwise(F.lit(1)))
      .drop("dupe_rank")
     )

df.show()

Output is:

 -------- --- --- ---------------- 
|    uuid|key| id|de_dupe_required|
 -------- --- --- ---------------- 
|fdkhflds|  A|  1|               0|
|dhgfsddd|  A|  1|               0|
|ieuieiue|  A|  2|               1|
|qwtriqrr|  A|  3|               1|
|sdjhfdjh|  E|  4|               0|
 -------- --- --- ---------------- 

Note this will still work if there are some combinations like having two (A,3) (as noted by @thebluephantom) since we order by id hence the rank will be greater than 1 for these rows.

Output for two (A,3):

 -------- --- --- ---------------- 
|    uuid|key| id|de_dupe_required|
 -------- --- --- ---------------- 
|fdkhflds|  A|  1|               0|
|dhgfsddd|  A|  1|               0|
|ieuieiue|  A|  2|               1|
|qwtriqrr|  A|  3|               1|
|qwtriqrr|  A|  3|               1|
|sdjhfdjh|  E|  4|               0|
 -------- --- --- ---------------- 

CodePudding user response:

The question is vague. This is my solution whereby we consider 2 A,3's being possible, thus not as per 1st answer.

%python
from pyspark.sql.functions import col, lit

df = spark.createDataFrame(
    [
       ("A", 1, "xyz"),
       ("A", 2, "xyz"),
       ("A", 3, "xyz"),
       ("A", 3, "xyz"),
       ("A", 1, "xyz"),
       ("E", 4, "xyz"),
       ("A", 9, "xyz")
    ],
    ["c1", "c2", "c3"]  
)

df2 = df.groupBy("c1", "c2").count().filter(col('count') == 1)
df3 = df2.groupBy("c1").count().filter(col('count') == 1)
df4 = df2.join(df3, df3.c1 ==  df2.c1, "leftanti").select("c1", "c2", lit(1)).toDF("c1", "c2", "ddr") 
dfA = df.select("c1","c2")
dfB = df4.select("c1","c2") 
df5 = dfA.exceptAll(dfB)
res = df4.withColumn("ddr", lit(1)).unionAll(df5.withColumn("ddr", lit(0)))
res.show()

returns:

 --- --- --- 
| c1| c2|ddr|
 --- --- --- 
|  A|  2|  1|
|  A|  9|  1|
|  A|  1|  0|
|  A|  1|  0|
|  A|  3|  0|
|  A|  3|  0|
|  E|  4|  0|
 --- --- --- 

It's about the algorithm, you can do the rest. It needs to be a step-wise approach.

  • Related