Home > OS >  find the min date (Closest to current date) if dates lies in future date and max date (closest to cu
find the min date (Closest to current date) if dates lies in future date and max date (closest to cu

Time:06-18

The below scenario needs to be implemented in SQL:-

Apply group by "sl.No", compare "date" column to the current date (6/17/2022) and select a row to represent the group with below conditions

  1. If all "Date" is in the future then pick a date that is nearer to the current date
  2. If "Date" is in past then pick a date which is nearer to the current date

Here is the sample data -

Sl.No Date status flag
1 8/25/2022 1 Y
1 6/17/2022 0 N
1 8/24/2022 0 Y
1 6/20/2022 1 N
2 6/28/2019 1 N
2 6/11/2019 1 N
2 6/30/2019 1 Y
3 7/25/2023 1 Y
3 6/17/2023 0 Y
3 8/14/2022 0 N
3 8/5/2023 0 N

Expected output

Sl.No Date status flag
1 6/20/2022 1 N
2 6/30/2019 1 Y
3 8/14/2022 0 Y

CodePudding user response:

Please refer below query -

with cte as 
(
  select slno, dates, status, flag,abs(datediff(now(),dates)) ddiff from near_date
 ), cte_1 as
 (
select slno,min(ddiff) mdiff
from cte 
where ddiff > 0
group by slno
)
select cte.slno,cte.dates, cte.status, cte.flag
from cte join cte_1
on cte.slno = cte_1.slno
and cte.ddiff = cte_1.mdiff

DB fiddle here.

CodePudding user response:

IF you can fit the entire group into the memory of an executor then here's a solution for you.

import pyspark.sql.functions 

target = "6/17/2022"
df = spark.createDataFrame( data=[
 (1,"8/25/2022",1,"Y"),
 (1,"6/17/2022",0,"N"),
 (1,"8/24/2022",0,"Y"),
 (1,"6/20/2022",1,"N"),
 (2,"6/28/2019",1,"N"),
 (2,"6/11/2019",1,"N"),
 (2,"6/30/2019",1,"Y"),
 (3,"7/25/2023",1,"Y"),
 (3,"6/17/2023",0,"Y"),
 (3,"8/14/2022",0,"N"),
 (3,"8/5/2023",0,"N"),],
schema = ["Sl_No","Date","status","flag"]
).withColumn("date", f.to_date(f.col("Date"), "MM/dd/yyyy") #convert to date
 ).withColumn("target", f.to_date(f.lit(target), "MM/dd/yyyy")) #add target row
grpd = df.groupby("Sl_No"
 ).agg(
  f.reverse(f.sort_array( # sort descending into the correct order by field date.It is second column but all "Sl_No" are the same so this still works. Structs are sorted in order of their columns
   f.collect_list( # collect all rows for the group into an array MUST FIT IN MEMORY
    f.struct( # create a struct so we can keep all data from each row together.
     *[f.col(column) for column in df.columns] # shorthand to pass varArgs of all columns
    )
  ))).alias("grouped_rows")
 )
grpd.select( 
 f.when( 
   f.col("grouped_rows")[0].Date > f.col("grouped_rows")[0].target, # first condition of the problem 
   f.expr("sort_array(filter( grouped_rows, x -> x.Date > x.target ))")[0] # correct sort and remove items in the past  target] 
 ).otherwise( # second condition
  f.expr("filter( grouped_rows, x -> x.Date <= x.target )")[0] # already sorted descending so this works.
 ).alias("rep")
 ).select( 
  f.col("rep.*") # turn columns of struct into columns of table
 ).show()
 ----- ---------- ------ ---- ---------- 
|Sl_No|      date|status|flag|    target|
 ----- ---------- ------ ---- ---------- 
|    1|2022-06-20|     1|   N|2022-06-17|
|    3|2022-08-14|     0|   N|2022-06-17|
|    2|2019-06-30|     1|   Y|2022-06-17|
 ----- ---------- ------ ---- ---------- 

Explain for this code has 1 shuffle:

== Physical Plan ==
Project [CASE WHEN (grouped_rows#886[0].Date > grouped_rows#886[0].target) THEN sort_array(filter(grouped_rows#886, lambdafunction((lambda x#993.Date > lambda x#993.target), lambda x#993, false)), true)[0] ELSE filter(grouped_rows#886, lambdafunction((lambda x#994.Date <= lambda x#994.target), lambda x#994, false))[0] END.Sl_No AS Sl_No#996L, CASE WHEN (grouped_rows#886[0].Date > grouped_rows#886[0].target) THEN sort_array(filter(grouped_rows#886, lambdafunction((lambda x#993.Date > lambda x#993.target), lambda x#993, false)), true)[0] ELSE filter(grouped_rows#886, lambdafunction((lambda x#994.Date <= lambda x#994.target), lambda x#994, false))[0] END.date AS date#997, CASE WHEN (grouped_rows#886[0].Date > grouped_rows#886[0].target) THEN sort_array(filter(grouped_rows#886, lambdafunction((lambda x#993.Date > lambda x#993.target), lambda x#993, false)), true)[0] ELSE filter(grouped_rows#886, lambdafunction((lambda x#994.Date <= lambda x#994.target), lambda x#994, false))[0] END.status AS status#998L, CASE WHEN (grouped_rows#886[0].Date > grouped_rows#886[0].target) THEN sort_array(filter(grouped_rows#886, lambdafunction((lambda x#993.Date > lambda x#993.target), lambda x#993, false)), true)[0] ELSE filter(grouped_rows#886, lambdafunction((lambda x#994.Date <= lambda x#994.target), lambda x#994, false))[0] END.flag AS flag#999, CASE WHEN (grouped_rows#886[0].Date > grouped_rows#886[0].target) THEN sort_array(filter(grouped_rows#886, lambdafunction((lambda x#993.Date > lambda x#993.target), lambda x#993, false)), true)[0] ELSE filter(grouped_rows#886, lambdafunction((lambda x#994.Date <= lambda x#994.target), lambda x#994, false))[0] END.target AS target#1000]
 - ObjectHashAggregate(keys=[Sl_No#708L], functions=[collect_list(named_struct(Sl_No, Sl_No#708L, date, date#716, status, status#710L, flag, flag#711, target, 19160), 0, 0)])
    - Exchange hashpartitioning(Sl_No#708L, 200)
       - ObjectHashAggregate(keys=[Sl_No#708L], functions=[partial_collect_list(named_struct(Sl_No, Sl_No#708L, date, date#716, status, status#710L, flag, flag#711, target, 19160), 0, 0)])
          - *(1) Project [Sl_No#708L, cast(cast(unix_timestamp(Date#709, MM/dd/yyyy, Some(America/Toronto)) as timestamp) as date) AS date#716, status#710L, flag#711]
             - Scan ExistingRDD[Sl_No#708L,Date#709,status#710L,flag#711]

Explain for SQL provided for other answer: (has 6 shuffles)

== Physical Plan ==
*(6) Project [slno#980L, date#716, status#710L, flag#711]
 - *(6) SortMergeJoin [slno#980L, ddiff#981], [slno#984L, mdiff#982], Inner
   :- *(2) Sort [slno#980L ASC NULLS FIRST, ddiff#981 ASC NULLS FIRST], false, 0
   :   - Exchange hashpartitioning(slno#980L, ddiff#981, 200)
   :      - *(1) Project [Sl_No#708L AS slno#980L, cast(cast(unix_timestamp(Date#709, MM/dd/yyyy, Some(America/Toronto)) as timestamp) as date) AS date#716, status#710L, flag#711, abs(datediff(19160, cast(cast(unix_timestamp(Date#709, MM/dd/yyyy, Some(America/Toronto)) as timestamp) as date))) AS ddiff#981]
   :         - *(1) Filter (isnotnull(Sl_No#708L) && isnotnull(abs(datediff(19160, cast(cast(unix_timestamp(Date#709, MM/dd/yyyy, Some(America/Toronto)) as timestamp) as date)))))
   :            - Scan ExistingRDD[Sl_No#708L,Date#709,status#710L,flag#711]
    - *(5) Sort [slno#984L ASC NULLS FIRST, mdiff#982 ASC NULLS FIRST], false, 0
       - Exchange hashpartitioning(slno#984L, mdiff#982, 200)
          - *(4) Filter isnotnull(mdiff#982)
             - *(4) HashAggregate(keys=[slno#984L], functions=[min(ddiff#985)])
                - Exchange hashpartitioning(slno#984L, 200)
                   - *(3) HashAggregate(keys=[slno#984L], functions=[partial_min(ddiff#985)])
                      - *(3) Project [Sl_No#708L AS slno#984L, abs(datediff(19160, cast(cast(unix_timestamp(Date#709, MM/dd/yyyy, Some(America/Toronto)) as timestamp) as date))) AS ddiff#985]
                         - *(3) Filter ((abs(datediff(19160, cast(cast(unix_timestamp(Date#709, MM/dd/yyyy, Some(America/Toronto)) as timestamp) as date))) > 0) && isnotnull(Sl_No#708L))
                            - Scan ExistingRDD[Sl_No#708L,Date#709,status#710L,flag#711]
  • Related