Home > Enterprise >  create list of map for columns based on days
create list of map for columns based on days

Time:11-25

I have a master table as

 ----------- ---------- ------------- 
|  Age      | Gender   | date        |
 ----------- ---------- ------------- 
|  [1,2]    |   M      |  2021-01-01 | 
|  [11,13]  |   M      |  2021-01-10 | 
|  [4,5]    |   M      |  2021-01-15 |
|  [3]      |   M      |  2021-01-30 |
|  [7]      |   F      |  2021-02-01 |
|  [2]      |   F      |  2021-02-16 |
|  [6]      |   F      |  2021-02-20 |

Output required is I have taken 15 days map but age date map can be changed to 15,30,45 days etc

 ----------- ---------- ----------------------------------------------------------- 
|  Age      | Gender   | date_age_map                                              |
 ----------- ---------- ----------------------------------------------------------- 
|  [1,2]    |   M      |  [2021-01-01-->[1,2]]                                     | 
|  [11,13]  |   M      |  [[2021-01-01-->[1,2]],[2021-01-10-->[11,13]]]            | 
|  [4,5]    |   M      |  [[2021-01-01-->[1,2]],[2021-01-10-->[11,13]],[2021-01-15-->[4,5]]] |
|  [3]      |   M      |  [2021-01-30-->[3]]                                       |
|  [7]      |   F      |  [2021-02-01-->[7]]                                       |
|  [2]      |   F      |  [[2021-02-01-->[7]],[2021-02-16-->[2]]]                  |
|  [6]      |   F      |  [2021-02-20-->[6]]                                       |

I have put some efforts as below

spark.sql("""
select Age,Gender,collect_list(date_age_map) over (partition by gender order by date) as date_age_map from 
select Age,Gender,map(date,age) as date_age_map from master""")

is there any way to do it using Window functions using spark df,spark sql or UDF

CodePudding user response:

You can use Window with rangeBetween to fix the window sliding interval when collecting age and date values to create your map.

Something like this:

from pyspark.sql import Window
import pyspark.sql.functions as F

days = 15 

# define a window with range between interval 15 days preceding and current row
# 86400 is the number of second in one day
w = Window.partitionBy("gender").orderBy("date2").rangeBetween(-days * 86400, 0)

df1 = df.withColumn(
    "date2",
    F.col("date").cast("timestamp").cast("long")
).withColumn(
    "date_age_map",
    F.map_from_arrays(
        F.collect_list("date").over(w),
        F.collect_list("age").over(w)
    )
).drop("date2")

df1.show(truncate=False)

# -------- ------ ---------- -------------------------------------------------------------------- 
#|age     |gender|date      |date_age_map                                                        |
# -------- ------ ---------- -------------------------------------------------------------------- 
#|[7]     |F     |2021-02-01|[2021-02-01 -> [7]]                                                 |
#|[2]     |F     |2021-02-16|[2021-02-01 -> [7], 2021-02-16 -> [2]]                              |
#|[6]     |F     |2021-02-20|[2021-02-16 -> [2], 2021-02-20 -> [6]]                              |
#|[1, 2]  |M     |2021-01-01|[2021-01-01 -> [1, 2]]                                              |
#|[11, 13]|M     |2021-01-10|[2021-01-01 -> [1, 2], 2021-01-10 -> [11, 13]]                      |
#|[4, 5]  |M     |2021-01-15|[2021-01-01 -> [1, 2], 2021-01-10 -> [11, 13], 2021-01-15 -> [4, 5]]|
#|[3]     |M     |2021-01-30|[2021-01-15 -> [4, 5], 2021-01-30 -> [3]]                           |
# -------- ------ ---------- -------------------------------------------------------------------- 
  • Related