I have a master table as
----------- ---------- -------------
| Age | Gender | date |
----------- ---------- -------------
| [1,2] | M | 2021-01-01 |
| [11,13] | M | 2021-01-10 |
| [4,5] | M | 2021-01-15 |
| [3] | M | 2021-01-30 |
| [7] | F | 2021-02-01 |
| [2] | F | 2021-02-16 |
| [6] | F | 2021-02-20 |
Output required is I have taken 15 days map but age date map can be changed to 15,30,45 days etc
----------- ---------- -----------------------------------------------------------
| Age | Gender | date_age_map |
----------- ---------- -----------------------------------------------------------
| [1,2] | M | [2021-01-01-->[1,2]] |
| [11,13] | M | [[2021-01-01-->[1,2]],[2021-01-10-->[11,13]]] |
| [4,5] | M | [[2021-01-01-->[1,2]],[2021-01-10-->[11,13]],[2021-01-15-->[4,5]]] |
| [3] | M | [2021-01-30-->[3]] |
| [7] | F | [2021-02-01-->[7]] |
| [2] | F | [[2021-02-01-->[7]],[2021-02-16-->[2]]] |
| [6] | F | [2021-02-20-->[6]] |
I have put some efforts as below
spark.sql("""
select Age,Gender,collect_list(date_age_map) over (partition by gender order by date) as date_age_map from
select Age,Gender,map(date,age) as date_age_map from master""")
is there any way to do it using Window functions using spark df,spark sql or UDF
CodePudding user response:
You can use Window with rangeBetween
to fix the window sliding interval when collecting age
and date
values to create your map.
Something like this:
from pyspark.sql import Window
import pyspark.sql.functions as F
days = 15
# define a window with range between interval 15 days preceding and current row
# 86400 is the number of second in one day
w = Window.partitionBy("gender").orderBy("date2").rangeBetween(-days * 86400, 0)
df1 = df.withColumn(
"date2",
F.col("date").cast("timestamp").cast("long")
).withColumn(
"date_age_map",
F.map_from_arrays(
F.collect_list("date").over(w),
F.collect_list("age").over(w)
)
).drop("date2")
df1.show(truncate=False)
# -------- ------ ---------- --------------------------------------------------------------------
#|age |gender|date |date_age_map |
# -------- ------ ---------- --------------------------------------------------------------------
#|[7] |F |2021-02-01|[2021-02-01 -> [7]] |
#|[2] |F |2021-02-16|[2021-02-01 -> [7], 2021-02-16 -> [2]] |
#|[6] |F |2021-02-20|[2021-02-16 -> [2], 2021-02-20 -> [6]] |
#|[1, 2] |M |2021-01-01|[2021-01-01 -> [1, 2]] |
#|[11, 13]|M |2021-01-10|[2021-01-01 -> [1, 2], 2021-01-10 -> [11, 13]] |
#|[4, 5] |M |2021-01-15|[2021-01-01 -> [1, 2], 2021-01-10 -> [11, 13], 2021-01-15 -> [4, 5]]|
#|[3] |M |2021-01-30|[2021-01-15 -> [4, 5], 2021-01-30 -> [3]] |
# -------- ------ ---------- --------------------------------------------------------------------