Home > Back-end >  sql - how to join on a column that is less than another join key
sql - how to join on a column that is less than another join key

Time:02-25

I have two tables as below. What I'm trying to do is to join A and B base on date and id, to get the value from B. The problem is, I want to join using add_month(A.Date, -1) = B.month (find the data in table B from one month earlier). If that's not available, I want to join using two months earlier add_month(A.Date, -2) = B.month How can I achieve this in one query? In the result all 3 rows should be joint. Spark sql is preferred instead of api. Many thanks.

Table A:
      --------------
      ID. |Date    |
     ---------------
      A   |2022-02 |  
     ---------------
      B   |2022-02 | 
     ---------------
      C   |2022-02 | 

Table B:
      ----------------
      ID. |Date    |value|
     -----------------
      A   |2022-01 | V1  
     -----------------
      B   |2022-01 | V2  
     ---------------
      C   |2021-12 | V3


Expected output:
      ----------------
      ID. |ADate    |value|
     -----------------
      A   |2022-02  | V1   --result from join condition add_month(A.Date, -1) = B.month
     -----------------
      B   |2022-02. | V2  
     ---------------
      C   |2022-02  | V3 ---result from join condition add_month(A.Date, -2) = B.month

CodePudding user response:

Once way I can think of is , you can create the required lag date for A columns and join with date with B as below -

Data Preparation

df1 = pd.DataFrame({
    'id':['A','B','C'],
    'Date':['2022-02'] * 3
})


sparkDF1 = sql.createDataFrame(df1)

sparkDF1 = sparkDF1.withColumn('date_lag_1',F.add_months(F.col('Date'),-1))\
                   .withColumn('date_lag_2',F.add_months(F.col('Date'),-2))

df2 = pd.DataFrame({
    'id':['A','B','C'],
    'Date':['2022-01','2022-01','2021-12'] ,
    'Value':['V1','V2','V3']
})


sparkDF2 = sql.createDataFrame(df2)


sparkDF1.show()
 --- ------- ---------- ---------- 
| id|   Date|date_lag_1|date_lag_2|
 --- ------- ---------- ---------- 
|  A|2022-02|2022-01-01|2021-12-01|
|  B|2022-02|2022-01-01|2021-12-01|
|  C|2022-02|2022-01-01|2021-12-01|
 --- ------- ---------- ---------- 

sparkDF2.show()

 --- ------- ----- 
| id|   Date|Value|
 --- ------- ----- 
|  A|2022-01|   V1|
|  B|2022-01|   V2|
|  C|2021-12|   V3|
 --- ------- ----- 

Join - Spark API

finalDF = sparkDF1.join(sparkDF2
                        ,   ( sparkDF1['id'] == sparkDF2['id'] )
                          & (     (sparkDF1['date_lag_1'] == F.to_date(sparkDF2['date'],'yyyy-MM'))
                                | (sparkDF1['date_lag_2'] == F.to_date(sparkDF2['date'],'yyyy-MM')) 
                            )
                        ,'inner'
          ).select(sparkDF1['id']
                   ,sparkDF1['Date']
                   ,sparkDF2['Value']
          ).orderBy(F.col('id'))

finalDF.show()

 --- ------- ----- 
| id|   Date|Value|
 --- ------- ----- 
|  A|2022-02|   V1|
|  B|2022-02|   V2|
|  C|2022-02|   V3|
 --- ------- ----- 

Join - SparkSQL

sparkDF1.registerTempTable("TB1")
sparkDF2.registerTempTable("TB2")

sql.sql("""
SELECT
    a.ID
    ,a.DATE
    ,b.VALUE
FROM TB1 a
INNER JOIN TB2 b
    ON a.ID = b.ID
    AND (ADD_MONTHS(a.DATE,-1) = B.DATE OR ADD_MONTHS(a.DATE,-2) = B.DATE)
ORDER BY a.ID
""").show()

 --- ------- ----- 
| ID|   DATE|VALUE|
 --- ------- ----- 
|  A|2022-02|   V1|
|  B|2022-02|   V2|
|  C|2022-02|   V3|
 --- ------- ----- 

CodePudding user response:

I never found a less tedious solution than putting each of the two tables to join into a common table expression, adding a LEAD() expression on the date column to each, and finally joining the two CTEs on a complex condition using an equi predicate over the id and a range predicate over the start and end dates :

WITH                                                                                                                                                                                         
-- your input , don't use in final query
a(id,dt) AS (
          SELECT 'a', DATE '2022-02-01'
UNION ALL SELECT 'b', DATE '2022-02-01'
UNION ALL SELECT 'c', DATE '2022-02-01'
)
,
b(id,dt,val) AS (
          SELECT 'a', DATE '2022-01-01','V1'
UNION ALL SELECT 'b', DATE '2022-01-01','V2'
UNION ALL SELECT 'c', DATE '2021-12-01','V3'
)
-- end of your input; real query starts here
-- replace following comma with "WITH"
,
a_w_next_date AS (
  SELECT
    *
  , LEAD(dt,1,'9999-12-01') OVER(PARTITION BY id ORDER BY dt) AS next_dt
  FROM a
)
,
b_w_next_date AS (
  SELECT
    *
  , LEAD(dt,1,'9999-12-01') OVER(PARTITION BY id ORDER BY dt) AS next_dt
  FROM b
)
SELECT
  a.id
, a.dt
, b.val
FROM a_w_next_date a
JOIN b_w_next_date b
  ON a.id = b.id
 AND a.dt >= b.dt
 AND a.dt < b.next_dt
 AND b.dt < a.next_dt
;
-- out  id |     dt     | val 
-- out ---- ------------ -----
-- out  a  | 2022-02-01 | V1
-- out  b  | 2022-02-01 | V2
-- out  c  | 2022-02-01 | V3
  • Related