Home > Software engineering >  left join with both aggregate function and select on joining table
left join with both aggregate function and select on joining table

Time:12-05

I have a general SQL query question that I think could be answered in various SQL flavors, although my example below is using spark sql.

I am trying to left join table 1 (t1) onto table 2 (t2), with the following goals:

  1. Keep all the values in t1 (hence left join)
  2. select a column in t2 based on an aggregate function in t2

Here are some test data:

t1
 --- -------- 
|pk1|constant|
 --- -------- 
|  a|constant|
|  b|constant|
|  c|constant|
|  d|constant|
 --- -------- 
t2
 --- --------- ------ 
|fk1|condition|target|
 --- --------- ------ 
|  a|        1|check1|
|  a|        2|check2|
|  b|        1|check1|
|  b|        2|check2|
 --- --------- ------ 

Here are a couple (failed) sample queries:

spark.sql("""
    select
        pk1,
        constant,
        target
    from
        t1
    left join
        t2
    on
        pk1 = fk1
    group by
        pk1, constant, target
    having
        min(condition)
""").show
 --- -------- ------ 
|pk1|constant|target|
 --- -------- ------ 
|  b|constant|check1|
|  a|constant|check2|
|  a|constant|check1|
|  b|constant|check2|
 --- -------- ------ 

Problem with query 1: I have lost the t1 rows for pk1 in 'c','d'. It looks like an inner join to me and not a left join.

spark.sql("""
    select
        pk1,
        constant,
        min(condition),
        target
    from
        t1
    left join
        t2
    on
        pk1 = fk1
    group by
        pk1, constant, target
""").show
 --- -------- -------------- ------ 
|pk1|constant|min(condition)|target|
 --- -------- -------------- ------ 
|  a|constant|             1|check1|
|  a|constant|             2|check2|
|  b|constant|             2|check2|
|  b|constant|             1|check1|
|  c|constant|          null|  null|
|  d|constant|          null|  null|
 --- -------- -------------- ------ 

Problem with query 2: I no longer have a filter on the minimum value for condition. For example, for pk1 = a, I have taken condition = 1 and condition = 2. The min function was seemingly not applied.

Desired Output

 --- -------- -------------- ------ 
|pk1|constant|min(condition)|target|
 --- -------- -------------- ------ 
|  a|constant|             1|check1|
|  b|constant|             1|check1|
|  c|constant|          null|  null|
|  d|constant|          null|  null|
 --- -------- -------------- ------ 

The min(condition) column is optional. I will filter it out later anyways.

I can create the desired output by separating the query into two statements, but I feel like there must be an elegant single-query solution here. Would someone have an idea how to accomplish this? Thank you!

appendix

Here are the commands to construct the test tables in case anyone wants to duplicate the test:

val columns1 = Seq("pk1", "constant")
val columns2 = Seq("fk1","condition","target")
val data1 = Seq( ("a","constant"), ("b","constant"), ("c","constant"), ("d","constant") )
val data2 = Seq( ("a",1,"check1"), ("a",2,"check2"), ("b",1,"check1"), ("b",2,"check2") )
val t1 = spark.createDataFrame(data1).toDF(columns1:_*)
val t2 = spark.createDataFrame(data2).toDF(columns2:_*)

CodePudding user response:

First, group by t2 on fk1 and use min on struct(condition, target) to get the corresponding target value of the min condition then join the grouped result with t1 :

spark.sql("""   
  WITH t3 AS (
    SELECT  fk1, 
            MIN(struct(condition, target))['target'] AS target
    FROM    t2
    GROUP BY fk1
  )

  SELECT  pk1,
          constant,
          target
  FROM    t1
  LEFT JOIN t3
  ON    pk1 = fk1
""").show

// --- -------- ------ 
//|pk1|constant|target|
// --- -------- ------ 
//|  a|constant|check1|
//|  b|constant|check1|
//|  c|constant|  null|
//|  d|constant|  null|
// --- -------- ------ 

Another way by using row_number() window function:

spark.sql("""
    WITH t3 AS (
      SELECT  *, 
              ROW_NUMBER() OVER (PARTITION BY fk1 ORDER BY condition) AS rn
      FROM    t2
    )
    
    SELECT  pk1,
            constant,
            target
    FROM    t1
    LEFT JOIN t3
    ON    pk1 = fk1
    AND   rn = 1
""").show

And if you want to left join then do the aggregation:

spark.sql("""   
    SELECT  pk1,
            MAX(constant) AS constant,
            MIN(struct(condition, target))['target'] AS target
    FROM    t1
    LEFT JOIN t2
    ON    pk1 = fk1
    GROUP BY pk1
""").show
  • Related