Home > Enterprise >  Row deduplication problem with daily updated rows. How to avoid counting the same row?
Row deduplication problem with daily updated rows. How to avoid counting the same row?

Time:12-15

This particular dataframe is updated daily with the "Customer ID" ,"status" and the "date" that said update occured, here is an example: example

Some clients receive updates daily, other don't. Some can have a status changed in a matter of days from 'no' to 'yes' and vice versa

Status with yes can be fetched with :

df = df \
    .select('id','status','date') \
    .filter(
        (col('date') >= '2022-10-01') &
        (col('date') <= '2022-10-31') & 
        (col(status) == "yes"))

The second selection must have none of the ID's present in the "yes" query. See ID "123" per example, if i exclued all rows with "yes" i am still counting that client in my "no" part of the query.

Tried using an OVER function to create a flag based on the ID to exclude what i already selected then apply a filter but it does not work, pyspark says that the expression is not supported within a window function.

partition = Window.partitionBy("id").orderBy("date")

df = df \
    .withColumn("results", 
     when((col("status") == "approved").over(partition), '0')
    .otherwise("1"))
Py4JJavaError: An error occurred while calling o808.withColumn.
: org.apache.spark.sql.AnalysisException: Expression '(result_decisaofinal#8593 = APROVA)' not supported within a window function.;;

CodePudding user response:

Following your comment:

Exactly, only one row for each ID following the rule: if Id has one row containing "yes" most recent "yes" else most recent "no"

You can do it with a simple row_number window function, partitioning by customer and ordering by the status (with 'yes' being before 'no' when using desc and date)

from datetime import date
from pyspark.sql import Window, functions as F
data = [
    {'customer': 123, 'status': 'no', 'date': date(2022, 10, 25)},
    {'customer': 123, 'status': 'yes', 'date': date(2022, 10, 22)},
    {'customer': 4141, 'status': 'no', 'date': date(2022, 10, 25)},
    {'customer': 4141, 'status': 'no', 'date': date(2022, 10, 22)},
    {'customer': 4141, 'status': 'no', 'date': date(2022, 10, 15)},
    {'customer': 5555, 'status': 'yes', 'date': date(2022, 10, 25)},
    {'customer': 5555, 'status': 'no', 'date': date(2022, 10, 22)},
    {'customer': 5555, 'status': 'no', 'date': date(2022, 10, 15)},
]
df = spark.createDataFrame(data)

part = Window.partitionBy('customer').orderBy(F.col('status').desc(), F.col('date').desc())
df2 = df.withColumn('rn', F.row_number().over(part)).filter('rn=1').drop('rn')
df2.show(truncate=False)
 -------- ---------- ------ 
|customer|date      |status|
 -------- ---------- ------ 
|123     |2022-10-22|yes   |
|4141    |2022-10-25|no    |
|5555    |2022-10-25|yes   |
 -------- ---------- ------ 

CodePudding user response:

I have one solution which may work but i am not sure if its good solution in terms of time and resources so if anyone knows how to improve it please leave a comment. For this moment i wasnt able to figure out anything else but maybe it will be usefull for you. I have a feeling that there is some trick that i dont know to do it smarter :D

import datetime 
import pyspark.sql.functions as F

x = [(123,"no", datetime.date(2020,10,25)),
    (123,"yes", datetime.date(2020,10,22)),
    (4141,"no", datetime.date(2020,10,25)),
    (4141,"no", datetime.date(2020,10,22)),
    (4141,"no", datetime.date(2020,10,15)),
    (5555,"yes", datetime.date(2020,10,25)),
    (5555,"no", datetime.date(2020,10,22)),
    (5555,"no", datetime.date(2020,10,15))]
df = spark.createDataFrame(x, schema=['customer_id', 'status', 'date'])
groupedDf = df.groupBy(F.col('customer_id'), F.col('status')).agg(F.max("date").alias("most_recent_date")).cache()
trueDf = groupedDf.filter(F.col('status') == F.lit('yes'))
falseDf = groupedDf.filter(F.col('status') == F.lit('no'))
falseWithNoCorrecpondingTrueDf = falseDf.join(trueDf, falseDf.customer_id == trueDf.customer_id, "anti")
finalDf = falseWithNoCorrecpondingTrueDf.union(trueDf)

No need for separate variables for dfs, i added it to make it more descriptive

Description step by step:

  • First i am grouping records to get max date for customer_id and status
  • Then i cache result of grouping as i know that it will be used two times and i dont want to compute it two times
  • I am splitting result of group by into two parts, one with "yes", other with "no"
  • I am dropping "no" which have correspoding "yes" because according to your logic they are not going to be used
  • I am doing a union of "no" which left with all "yes" which should give me resulting df you want to have

Output from sample job:

 ----------- ------ ---------------- 
|customer_id|status|most_recent_date|
 ----------- ------ ---------------- 
|       4141|    no|      2020-10-25|
|        123|   yes|      2020-10-22|
|       5555|   yes|      2020-10-25|
 ----------- ------ ---------------- 
  • Related