I have data like this
[
{"uuid":"fdkhflds","key": "A", "id": "1"},
{"uuid":"ieuieiue","key": "A", "id": "2"},
{"uuid":"qwtriqrr","key": "A", "id": "3"},
{"uuid":"dhgfsddd","key": "A", "id": "1"},
{"uuid":"sdjhfdjh","key": "E", "id": "4"}
]
I want to add flag in those column where key is same but id is different.
Expected output:
[
{"uuid":"fdkhflds","key": "A", "id": "1","de_dupe_required": 0},
{"uuid":"ieuieiue","key": "A", "id": "2","de_dupe_required": 1},
{"uuid":"qwtriqrr","key": "A", "id": "3","de_dupe_required": 1},
{"uuid":"dhgfsddd","key": "A", "id": "1","de_dupe_required": 0},
{"uuid":"sdjhfdjh","key": "E", "id": "4","de_dupe_required": 0}
]
Explanation:
- Since first and fourth record have same key and id, So no flag is needed
- Since fifth record has no same key or id, So no flag for this as well
- Since second and third have the same key, but id is different so flag should be 1
CodePudding user response:
You could achieve this with pyspark.sql.Window
by generating a rank()
for the keys ordered by the id. Then marking as de_dupe_required
wherever the rank()
is not 1
.
from pyspark.sql import functions as F, Window
window_spec = Window.partitionBy("key").orderBy("id")
df = (df.withColumn("dupe_rank", F.rank().over(window_spec))
.withColumn("de_dupe_required", F.when(F.col("dupe_rank")==1, F.lit(0))
.otherwise(F.lit(1)))
.drop("dupe_rank")
)
df.show()
Output is:
-------- --- --- ----------------
| uuid|key| id|de_dupe_required|
-------- --- --- ----------------
|fdkhflds| A| 1| 0|
|dhgfsddd| A| 1| 0|
|ieuieiue| A| 2| 1|
|qwtriqrr| A| 3| 1|
|sdjhfdjh| E| 4| 0|
-------- --- --- ----------------
Note this will still work if there are some combinations like having two (A,3) (as noted by @thebluephantom) since we order by id
hence the rank will be greater than 1
for these rows.
Output for two (A,3):
-------- --- --- ----------------
| uuid|key| id|de_dupe_required|
-------- --- --- ----------------
|fdkhflds| A| 1| 0|
|dhgfsddd| A| 1| 0|
|ieuieiue| A| 2| 1|
|qwtriqrr| A| 3| 1|
|qwtriqrr| A| 3| 1|
|sdjhfdjh| E| 4| 0|
-------- --- --- ----------------
CodePudding user response:
The question is vague. This is my solution whereby we consider 2 A,3's being possible, thus not as per 1st answer.
%python
from pyspark.sql.functions import col, lit
df = spark.createDataFrame(
[
("A", 1, "xyz"),
("A", 2, "xyz"),
("A", 3, "xyz"),
("A", 3, "xyz"),
("A", 1, "xyz"),
("E", 4, "xyz"),
("A", 9, "xyz")
],
["c1", "c2", "c3"]
)
df2 = df.groupBy("c1", "c2").count().filter(col('count') == 1)
df3 = df2.groupBy("c1").count().filter(col('count') == 1)
df4 = df2.join(df3, df3.c1 == df2.c1, "leftanti").select("c1", "c2", lit(1)).toDF("c1", "c2", "ddr")
dfA = df.select("c1","c2")
dfB = df4.select("c1","c2")
df5 = dfA.exceptAll(dfB)
res = df4.withColumn("ddr", lit(1)).unionAll(df5.withColumn("ddr", lit(0)))
res.show()
returns:
--- --- ---
| c1| c2|ddr|
--- --- ---
| A| 2| 1|
| A| 9| 1|
| A| 1| 0|
| A| 1| 0|
| A| 3| 0|
| A| 3| 0|
| E| 4| 0|
--- --- ---
It's about the algorithm, you can do the rest. It needs to be a step-wise approach.