I'm new to SQL, coming over from Python and R, and using Spark SQL with Databricks. I'm trying to complete a basic query and would appreciate guidance, especially guidance that explains the underlying concepts of SQL as they relate to my question.
I have a calendar table with complete, consecutive dates, and a data table with date_added
, user_id
, sales
, and price
columns. The data table has incomplete dates, since not every user is active on every date. Below are examples of each table.
Calendar Table
date
2020-01-01
2020-01-02
2020-01-03
2020-01-04
2020-01-05
2020-01-06
Data Table
date_added user_id sales price
2020-01-02 01 1 4.00
2020-01-05 01 3 4.00
2020-01-02 02 1 5.00
2020-01-03 02 1 5.00
2020-01-05 02 2 5.00
2020-01-03 03 2 1.00
2020-01-05 03 5 1.00
I am looking to create a new table, where every calendar table date within a certain range (the active dates) is defined for every user, and null values for all columns except the sales column are filled by the following value in that column. Something along these lines:
date user_id sales price
2020-01-02 01 1 4.00
2020-01-03 01 null 4.00
2020-01-04 01 null 4.00
2020-01-05 01 3 4.00
2020-01-02 02 1 5.00
2020-01-03 02 1 5.00
2020-01-04 02 null 5.00
2020-01-05 02 2 5.00
2020-01-02 03 null 1.00
2020-01-03 03 2 1.00
2020-01-04 03 null 1.00
2020-01-05 03 5 1.00
Any guidance is appreciated on how I might proceed to this output. I've tried to use a LEFT JOIN on the dates, but without success. I know that the UNION operator is used to concatenate tables on top of one another, but don't know how I would apply that method here.
CodePudding user response:
You can use cross join the users with the calendar table then left join with data table:
spark.sql("""
SELECT date, dates.user_id, sales, COALESCE(data.price, dates.price) AS price
FROM (
SELECT user_id, price, date
FROM (SELECT user_id, FIRST(price) as price FROM data_table GROUP BY user_id)
CROSS JOIN calender_table
WHERE date >= (SELECT MIN(date_added) FROM data_table)
AND date <= (SELECT MAX(date_added) FROM data_table)
) dates
LEFT JOIN data_table data
ON dates.user_id = data.user_id
AND dates.date = data.date_added
""").show()
Output:
---------- ------- ----- -----
|date |user_id|sales|price|
---------- ------- ----- -----
|2020-01-02|01 |1 |4.0 |
|2020-01-03|01 |null |4.0 |
|2020-01-04|01 |null |4.0 |
|2020-01-05|01 |3 |4.0 |
|2020-01-02|02 |1 |5.0 |
|2020-01-03|02 |1 |5.0 |
|2020-01-04|02 |null |5.0 |
|2020-01-05|02 |2 |5.0 |
|2020-01-02|03 |null |1.0 |
|2020-01-03|03 |2 |1.0 |
|2020-01-04|03 |null |1.0 |
|2020-01-05|03 |5 |1.0 |
---------- ------- ----- -----
You can also generate the dates without using a calendar table using sequence
function. See my other answer here.
CodePudding user response:
Let your original dataframe as df1
. Then you can get the min
, max
date for each id
and let it as `df2'.
from pyspark.sql import functions as f
from pyspark.sql import Window
w = Window.partitionBy('user_id').orderBy(f.desc('date_added'))
df2 = df1.groupBy('user_id') \
.agg(f.sequence(f.min('date_added'), f.max('date_added')).alias('date_added')) \
.withColumn('date_added', f.explode('date_added'))
df2.join(df, ['user_id', 'date_added'], 'left') \
.withColumn('price', f.first('price').over(w)) \
.orderBy('user_id', 'date_added') \
.show()
------- ---------- ----- -----
|user_id|date_added|sales|price|
------- ---------- ----- -----
| 1|2020-01-02| 1| 4.0|
| 1|2020-01-03| null| 4.0|
| 1|2020-01-04| null| 4.0|
| 1|2020-01-05| 3| 4.0|
| 2|2020-01-02| 1| 5.0|
| 2|2020-01-03| 1| 5.0|
| 2|2020-01-04| null| 5.0|
| 2|2020-01-05| 2| 5.0|
| 3|2020-01-03| 2| 1.0|
| 3|2020-01-04| null| 1.0|
| 3|2020-01-05| 5| 1.0|
------- ---------- ----- -----