I have a general SQL query question that I think could be answered in various SQL flavors, although my example below is using spark sql.
I am trying to left join table 1 (t1
) onto table 2 (t2
), with the following goals:
- Keep all the values in
t1
(hence left join) - select a column in
t2
based on an aggregate function int2
Here are some test data:
t1
--- --------
|pk1|constant|
--- --------
| a|constant|
| b|constant|
| c|constant|
| d|constant|
--- --------
t2
--- --------- ------
|fk1|condition|target|
--- --------- ------
| a| 1|check1|
| a| 2|check2|
| b| 1|check1|
| b| 2|check2|
--- --------- ------
Here are a couple (failed) sample queries:
spark.sql("""
select
pk1,
constant,
target
from
t1
left join
t2
on
pk1 = fk1
group by
pk1, constant, target
having
min(condition)
""").show
--- -------- ------
|pk1|constant|target|
--- -------- ------
| b|constant|check1|
| a|constant|check2|
| a|constant|check1|
| b|constant|check2|
--- -------- ------
Problem with query 1: I have lost the t1
rows for pk1
in 'c'
,'d'
. It looks like an inner join to me and not a left join.
spark.sql("""
select
pk1,
constant,
min(condition),
target
from
t1
left join
t2
on
pk1 = fk1
group by
pk1, constant, target
""").show
--- -------- -------------- ------
|pk1|constant|min(condition)|target|
--- -------- -------------- ------
| a|constant| 1|check1|
| a|constant| 2|check2|
| b|constant| 2|check2|
| b|constant| 1|check1|
| c|constant| null| null|
| d|constant| null| null|
--- -------- -------------- ------
Problem with query 2: I no longer have a filter on the minimum value for condition. For example, for pk1 = a
, I have taken condition = 1
and condition = 2
. The min function was seemingly not applied.
Desired Output
--- -------- -------------- ------
|pk1|constant|min(condition)|target|
--- -------- -------------- ------
| a|constant| 1|check1|
| b|constant| 1|check1|
| c|constant| null| null|
| d|constant| null| null|
--- -------- -------------- ------
The min(condition)
column is optional. I will filter it out later anyways.
I can create the desired output by separating the query into two statements, but I feel like there must be an elegant single-query solution here. Would someone have an idea how to accomplish this? Thank you!
appendix
Here are the commands to construct the test tables in case anyone wants to duplicate the test:
val columns1 = Seq("pk1", "constant")
val columns2 = Seq("fk1","condition","target")
val data1 = Seq( ("a","constant"), ("b","constant"), ("c","constant"), ("d","constant") )
val data2 = Seq( ("a",1,"check1"), ("a",2,"check2"), ("b",1,"check1"), ("b",2,"check2") )
val t1 = spark.createDataFrame(data1).toDF(columns1:_*)
val t2 = spark.createDataFrame(data2).toDF(columns2:_*)
CodePudding user response:
First, group by t2
on fk1
and use min on struct(condition, target)
to get the corresponding target value of the min condition then join the grouped result with t1
:
spark.sql("""
WITH t3 AS (
SELECT fk1,
MIN(struct(condition, target))['target'] AS target
FROM t2
GROUP BY fk1
)
SELECT pk1,
constant,
target
FROM t1
LEFT JOIN t3
ON pk1 = fk1
""").show
// --- -------- ------
//|pk1|constant|target|
// --- -------- ------
//| a|constant|check1|
//| b|constant|check1|
//| c|constant| null|
//| d|constant| null|
// --- -------- ------
Another way by using row_number()
window function:
spark.sql("""
WITH t3 AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY fk1 ORDER BY condition) AS rn
FROM t2
)
SELECT pk1,
constant,
target
FROM t1
LEFT JOIN t3
ON pk1 = fk1
AND rn = 1
""").show
And if you want to left join then do the aggregation:
spark.sql("""
SELECT pk1,
MAX(constant) AS constant,
MIN(struct(condition, target))['target'] AS target
FROM t1
LEFT JOIN t2
ON pk1 = fk1
GROUP BY pk1
""").show