The below scenario needs to be implemented in SQL:-
Apply group by "sl.No", compare "date" column to the current date (6/17/2022) and select a row to represent the group with below conditions
- If all "Date" is in the future then pick a date that is nearer to the current date
- If "Date" is in past then pick a date which is nearer to the current date
Here is the sample data -
Sl.No | Date | status | flag |
---|---|---|---|
1 | 8/25/2022 | 1 | Y |
1 | 6/17/2022 | 0 | N |
1 | 8/24/2022 | 0 | Y |
1 | 6/20/2022 | 1 | N |
2 | 6/28/2019 | 1 | N |
2 | 6/11/2019 | 1 | N |
2 | 6/30/2019 | 1 | Y |
3 | 7/25/2023 | 1 | Y |
3 | 6/17/2023 | 0 | Y |
3 | 8/14/2022 | 0 | N |
3 | 8/5/2023 | 0 | N |
Expected output
Sl.No | Date | status | flag |
---|---|---|---|
1 | 6/20/2022 | 1 | N |
2 | 6/30/2019 | 1 | Y |
3 | 8/14/2022 | 0 | Y |
CodePudding user response:
Please refer below query -
with cte as
(
select slno, dates, status, flag,abs(datediff(now(),dates)) ddiff from near_date
), cte_1 as
(
select slno,min(ddiff) mdiff
from cte
where ddiff > 0
group by slno
)
select cte.slno,cte.dates, cte.status, cte.flag
from cte join cte_1
on cte.slno = cte_1.slno
and cte.ddiff = cte_1.mdiff
DB fiddle here.
CodePudding user response:
IF you can fit the entire group into the memory of an executor then here's a solution for you.
import pyspark.sql.functions
target = "6/17/2022"
df = spark.createDataFrame( data=[
(1,"8/25/2022",1,"Y"),
(1,"6/17/2022",0,"N"),
(1,"8/24/2022",0,"Y"),
(1,"6/20/2022",1,"N"),
(2,"6/28/2019",1,"N"),
(2,"6/11/2019",1,"N"),
(2,"6/30/2019",1,"Y"),
(3,"7/25/2023",1,"Y"),
(3,"6/17/2023",0,"Y"),
(3,"8/14/2022",0,"N"),
(3,"8/5/2023",0,"N"),],
schema = ["Sl_No","Date","status","flag"]
).withColumn("date", f.to_date(f.col("Date"), "MM/dd/yyyy") #convert to date
).withColumn("target", f.to_date(f.lit(target), "MM/dd/yyyy")) #add target row
grpd = df.groupby("Sl_No"
).agg(
f.reverse(f.sort_array( # sort descending into the correct order by field date.It is second column but all "Sl_No" are the same so this still works. Structs are sorted in order of their columns
f.collect_list( # collect all rows for the group into an array MUST FIT IN MEMORY
f.struct( # create a struct so we can keep all data from each row together.
*[f.col(column) for column in df.columns] # shorthand to pass varArgs of all columns
)
))).alias("grouped_rows")
)
grpd.select(
f.when(
f.col("grouped_rows")[0].Date > f.col("grouped_rows")[0].target, # first condition of the problem
f.expr("sort_array(filter( grouped_rows, x -> x.Date > x.target ))")[0] # correct sort and remove items in the past target]
).otherwise( # second condition
f.expr("filter( grouped_rows, x -> x.Date <= x.target )")[0] # already sorted descending so this works.
).alias("rep")
).select(
f.col("rep.*") # turn columns of struct into columns of table
).show()
----- ---------- ------ ---- ----------
|Sl_No| date|status|flag| target|
----- ---------- ------ ---- ----------
| 1|2022-06-20| 1| N|2022-06-17|
| 3|2022-08-14| 0| N|2022-06-17|
| 2|2019-06-30| 1| Y|2022-06-17|
----- ---------- ------ ---- ----------
Explain for this code has 1 shuffle:
== Physical Plan ==
Project [CASE WHEN (grouped_rows#886[0].Date > grouped_rows#886[0].target) THEN sort_array(filter(grouped_rows#886, lambdafunction((lambda x#993.Date > lambda x#993.target), lambda x#993, false)), true)[0] ELSE filter(grouped_rows#886, lambdafunction((lambda x#994.Date <= lambda x#994.target), lambda x#994, false))[0] END.Sl_No AS Sl_No#996L, CASE WHEN (grouped_rows#886[0].Date > grouped_rows#886[0].target) THEN sort_array(filter(grouped_rows#886, lambdafunction((lambda x#993.Date > lambda x#993.target), lambda x#993, false)), true)[0] ELSE filter(grouped_rows#886, lambdafunction((lambda x#994.Date <= lambda x#994.target), lambda x#994, false))[0] END.date AS date#997, CASE WHEN (grouped_rows#886[0].Date > grouped_rows#886[0].target) THEN sort_array(filter(grouped_rows#886, lambdafunction((lambda x#993.Date > lambda x#993.target), lambda x#993, false)), true)[0] ELSE filter(grouped_rows#886, lambdafunction((lambda x#994.Date <= lambda x#994.target), lambda x#994, false))[0] END.status AS status#998L, CASE WHEN (grouped_rows#886[0].Date > grouped_rows#886[0].target) THEN sort_array(filter(grouped_rows#886, lambdafunction((lambda x#993.Date > lambda x#993.target), lambda x#993, false)), true)[0] ELSE filter(grouped_rows#886, lambdafunction((lambda x#994.Date <= lambda x#994.target), lambda x#994, false))[0] END.flag AS flag#999, CASE WHEN (grouped_rows#886[0].Date > grouped_rows#886[0].target) THEN sort_array(filter(grouped_rows#886, lambdafunction((lambda x#993.Date > lambda x#993.target), lambda x#993, false)), true)[0] ELSE filter(grouped_rows#886, lambdafunction((lambda x#994.Date <= lambda x#994.target), lambda x#994, false))[0] END.target AS target#1000]
- ObjectHashAggregate(keys=[Sl_No#708L], functions=[collect_list(named_struct(Sl_No, Sl_No#708L, date, date#716, status, status#710L, flag, flag#711, target, 19160), 0, 0)])
- Exchange hashpartitioning(Sl_No#708L, 200)
- ObjectHashAggregate(keys=[Sl_No#708L], functions=[partial_collect_list(named_struct(Sl_No, Sl_No#708L, date, date#716, status, status#710L, flag, flag#711, target, 19160), 0, 0)])
- *(1) Project [Sl_No#708L, cast(cast(unix_timestamp(Date#709, MM/dd/yyyy, Some(America/Toronto)) as timestamp) as date) AS date#716, status#710L, flag#711]
- Scan ExistingRDD[Sl_No#708L,Date#709,status#710L,flag#711]
Explain for SQL provided for other answer: (has 6 shuffles)
== Physical Plan ==
*(6) Project [slno#980L, date#716, status#710L, flag#711]
- *(6) SortMergeJoin [slno#980L, ddiff#981], [slno#984L, mdiff#982], Inner
:- *(2) Sort [slno#980L ASC NULLS FIRST, ddiff#981 ASC NULLS FIRST], false, 0
: - Exchange hashpartitioning(slno#980L, ddiff#981, 200)
: - *(1) Project [Sl_No#708L AS slno#980L, cast(cast(unix_timestamp(Date#709, MM/dd/yyyy, Some(America/Toronto)) as timestamp) as date) AS date#716, status#710L, flag#711, abs(datediff(19160, cast(cast(unix_timestamp(Date#709, MM/dd/yyyy, Some(America/Toronto)) as timestamp) as date))) AS ddiff#981]
: - *(1) Filter (isnotnull(Sl_No#708L) && isnotnull(abs(datediff(19160, cast(cast(unix_timestamp(Date#709, MM/dd/yyyy, Some(America/Toronto)) as timestamp) as date)))))
: - Scan ExistingRDD[Sl_No#708L,Date#709,status#710L,flag#711]
- *(5) Sort [slno#984L ASC NULLS FIRST, mdiff#982 ASC NULLS FIRST], false, 0
- Exchange hashpartitioning(slno#984L, mdiff#982, 200)
- *(4) Filter isnotnull(mdiff#982)
- *(4) HashAggregate(keys=[slno#984L], functions=[min(ddiff#985)])
- Exchange hashpartitioning(slno#984L, 200)
- *(3) HashAggregate(keys=[slno#984L], functions=[partial_min(ddiff#985)])
- *(3) Project [Sl_No#708L AS slno#984L, abs(datediff(19160, cast(cast(unix_timestamp(Date#709, MM/dd/yyyy, Some(America/Toronto)) as timestamp) as date))) AS ddiff#985]
- *(3) Filter ((abs(datediff(19160, cast(cast(unix_timestamp(Date#709, MM/dd/yyyy, Some(America/Toronto)) as timestamp) as date))) > 0) && isnotnull(Sl_No#708L))
- Scan ExistingRDD[Sl_No#708L,Date#709,status#710L,flag#711]