I have a data quality task where I need to allow one row in each group to remain unchanged, and then update the 'duplicate' column in the Delta table for the rest of the rows in the group to 'true'. Initially, when the data is loaded into the table, all values in the 'duplicate' column are 'false'.
The data is stored in Delta format and I am using Spark in Databricks.
An example would be that below. I then want to run a query which updates the 'duplicate' column for all rows in a group to 'true' and leaves only a single value as 'false'. This is so a downstream pipeline can still pick up one of the values where we have duplicates for downstream processing.
The table starts out like this:
ID | Value1 | Value2 | Duplicate |
---|---|---|---|
23 | a | b | false |
23 | a | b | false |
24 | c | d | false |
25 | e | f | false |
26 | g | h | false |
26 | g | h | false |
and I need to end up with this:
ID | Value1 | Value2 | Duplicate |
---|---|---|---|
23 | a | b | false |
23 | a | b | true |
24 | c | d | false |
25 | e | f | false |
26 | g | h | false |
26 | g | h | true |
I could of course simply say spark.table("myTable").dropDuplicates()
but I would like to assess how big the issue is with duplicates as an external supplier provides data and we need to understand if there are excessive retries on the system which push up costs.
I am struggling to find a way to change all but one entry where I do not have any unique identifier to use. I have tried many different ways, but have failed. I have included what I would like to achieve below (this will clearly fail).
WITH CTE AS (
SELECT
ID,
Duplicate,
row_number() over (partition by ID) AS rn,
FROM myTable
)
UPDATE myTable SET Duplicate = 'true' WHERE (SELECT rn FROM CTE) > 1
I am unsure if this is even possible without having some unique identifier. If possible I would like to try and avoid using any non-deterministic hashes just to accomplish this as the dataset is very large.
I can use any of Scala, PySpark or SQL within Spark so language isn't vital.
Any pointers would be greatly appreciated.
CodePudding user response:
In pyspark you can try with row_number as below
df1.withColumn("rank",row_number().over(Window.partitionBy("ID").orderBy("ID"))).withColumn("duplicate",when(col("rank")>1,True).otherwise(False)).drop("rank").show()
#output
--- ------ ------ ---------
| Id|value1|value2|duplicate|
--- ------ ------ ---------
| 23| a| b| false|
| 23| a| b| true|
| 24| c| d| false|
| 25| e| f| false|
| 26| g| h| false|
| 26| g| h| true|
--- ------ ------ ---------
CodePudding user response:
For SQL, something like this should do the trick.
WITH duplicates
AS (
SELECT
ID,
Duplicate,
ROW_NUMBER() OVER (PARTITION BY ID ORDER BY ID) AS rn
FROM
myTable
)
UPDATE duplicates
SET Duplicate = 'false'
WHERE rn > 1