Home > Net >  Spark SQL: query nested fields within/as an inner query
Spark SQL: query nested fields within/as an inner query

Time:09-30

I have a dataframe of products with a nested field (categories) which is an Array of Structs. Some product properties change over time getting corrected:

 --------- ------------ ------- ------------------------ 
| product |    date    | brand | categories             |
 ========= ============ ======= ======================== 
|    1    | 01.01.2020 | b1    | name: ca1, taxonomy: a |
|         |            |        ------------------------ 
|         |            |       | name: cb1, taxonomy: b |
 --------- ------------ ------- ------------------------ 
|    2    | 01.01.2020 | b3    | name: ca3, taxonomy: a |
 --------- ------------ ------- ------------------------ 
|    1    | 02.01.2020 | b2    | name: ca2, taxonomy: a |
|         |            |        ------------------------ 
|         |            |       | name: cb2, taxonomy: b |
 --------- ------------ ------- ------------------------ 
|    1    | 03.01.2020 |       |                        |
 --------- ------------ ------- ------------------------ 

I would like to get the last set brand, category_a (based on taxonomy a), category_b (based on taxonomy b) per product. So the expected outcome should look like:

 --------- ------- ------------ ------------ 
| product | brand | category_a | category_b |
 ========= ======= ============ ============ 
|    1    | b2    | ca2        | cb2        |
 --------- ------- ------------ ------------ 
|    2    | b3    | ca3        |            |
 --------- ------- ------------ ------------ 

Assuming a view is created for this dataframe and named products, I have tried the following query:

SELECT DISTINCT
  p.product AS product,
  LAST_VALUE(p.brand) IGNORE NULLS OVER (PARTITION BY p.product ORDER BY p.date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS brand,
  LAST_VALUE((SELECT name FROM LATERAL VIEW EXPLODE(p.categories) WHERE taxonomy = "a" LIMIT 1)) IGNORE NULLS OVER (PARTITION BY p.product ORDER BY p.date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS category_a,
  LAST_VALUE((SELECT name FROM LATERAL VIEW EXPLODE(p.categories) WHERE taxonomy = "b" LIMIT 1)) IGNORE NULLS OVER (PARTITION BY p.product ORDER BY p.date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS category_b
FROM
  products AS p

This query is leading to an exception

pyspark.sql.utils.AnalysisException: Accessing outer query column is not allowed in:
Generate explode(outer(categories..))

Although the exception is clear, I think the use-case is not unique, and there should be some solution for this problem that unfortunately I couldn't get so far.

I know I can get the outcome I am expecting using BigQuery's Standard SQL:

SELECT DISTINCT
  p.product AS product,
  LAST_VALUE(p.brand IGNORE NULLS) OVER (PARTITION BY p.product ORDER BY p.date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS brand,
  LAST_VALUE((SELECT name FROM UNNEST(p.categories) WHERE taxonomy = "a") IGNORE NULLS) OVER (PARTITION BY p.product ORDER BY p.date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS category_a,
  LAST_VALUE((SELECT name FROM UNNEST(p.categories) WHERE taxonomy = "b") IGNORE NULLS) OVER (PARTITION BY p.product ORDER BY p.date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS category_b
FROM
  products AS p

I can also get this outcome if I broke the query into two (one per category taxonomy) and cross join the "exploded" categories into the original products view, but is there a way that I can get this in one query?

CodePudding user response:

Given your df:

from datetime import date
import json
df = spark.read.json(spark.sparkContext.parallelize([
    {'product': 1, 'date': '2020-01-01', 'brand': 'b1', 'categories': [{'name': 'ca1', 'taxonomy': 'a'}, {'name': 'cb1', 'taxonomy': 'b'}]},
    {'product': 2, 'date': '2020-01-01', 'brand': 'b3', 'categories': [{'name': 'ca3', 'taxonomy': 'a'}]},
    {'product': 1, 'date': '2020-01-02', 'brand': 'b2', 'categories': [{'name': 'ca2', 'taxonomy': 'a'}, {'name': 'cb2', 'taxonomy': 'b'}]},
    {'product': 1, 'date': '2020-01-03', 'brand': None, 'categories': None}
]).map(json.dumps))
df.show(truncate=False)
 ----- -------------------- ---------- ------- 
|brand|categories          |date      |product|
 ----- -------------------- ---------- ------- 
|b1   |[{ca1, a}, {cb1, b}]|2020-01-01|1      |
|b3   |[{ca3, a}]          |2020-01-01|2      |
|b2   |[{ca2, a}, {cb2, b}]|2020-01-02|1      |
|null |null                |2020-01-03|1      |
 ----- -------------------- ---------- ------- 

you can use the following SQL

spark.sql("""
with df_with_maps as (
  select product, brand, date, map_from_arrays(categories.taxonomy, categories.name) as category_map from df
)

select DISTINCT(product),
last(brand, true) over(PARTITION by product order by date asc rows BETWEEN UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as brand, 
last(category_map['a'], true) over(PARTITION by product order by date asc rows BETWEEN UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as category_a, 
last(category_map['b'], true) over(PARTITION by product order by date asc rows BETWEEN UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as category_b 
from df_with_maps
""").show(truncate=False)
 ------- ----- ---------- ---------- 
|product|brand|category_a|category_b|
 ------- ----- ---------- ---------- 
|1      |b2   |ca2       |cb2       |
|2      |b3   |ca3       |null      |
 ------- ----- ---------- ---------- 

this can obviously be written with the python API as well, i just assume you prefer the raw SQL

  • Related