I have a dataframe of products with a nested field (categories
) which is an Array
of Struct
s. Some product properties change over time getting corrected:
--------- ------------ ------- ------------------------
| product | date | brand | categories |
========= ============ ======= ========================
| 1 | 01.01.2020 | b1 | name: ca1, taxonomy: a |
| | | ------------------------
| | | | name: cb1, taxonomy: b |
--------- ------------ ------- ------------------------
| 2 | 01.01.2020 | b3 | name: ca3, taxonomy: a |
--------- ------------ ------- ------------------------
| 1 | 02.01.2020 | b2 | name: ca2, taxonomy: a |
| | | ------------------------
| | | | name: cb2, taxonomy: b |
--------- ------------ ------- ------------------------
| 1 | 03.01.2020 | | |
--------- ------------ ------- ------------------------
I would like to get the last set brand
, category_a
(based on taxonomy a), category_b
(based on taxonomy b) per product. So the expected outcome should look like:
--------- ------- ------------ ------------
| product | brand | category_a | category_b |
========= ======= ============ ============
| 1 | b2 | ca2 | cb2 |
--------- ------- ------------ ------------
| 2 | b3 | ca3 | |
--------- ------- ------------ ------------
Assuming a view is created for this dataframe and named products
, I have tried the following query:
SELECT DISTINCT
p.product AS product,
LAST_VALUE(p.brand) IGNORE NULLS OVER (PARTITION BY p.product ORDER BY p.date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS brand,
LAST_VALUE((SELECT name FROM LATERAL VIEW EXPLODE(p.categories) WHERE taxonomy = "a" LIMIT 1)) IGNORE NULLS OVER (PARTITION BY p.product ORDER BY p.date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS category_a,
LAST_VALUE((SELECT name FROM LATERAL VIEW EXPLODE(p.categories) WHERE taxonomy = "b" LIMIT 1)) IGNORE NULLS OVER (PARTITION BY p.product ORDER BY p.date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS category_b
FROM
products AS p
This query is leading to an exception
pyspark.sql.utils.AnalysisException: Accessing outer query column is not allowed in:
Generate explode(outer(categories..))
Although the exception is clear, I think the use-case is not unique, and there should be some solution for this problem that unfortunately I couldn't get so far.
I know I can get the outcome I am expecting using BigQuery's Standard SQL:
SELECT DISTINCT
p.product AS product,
LAST_VALUE(p.brand IGNORE NULLS) OVER (PARTITION BY p.product ORDER BY p.date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS brand,
LAST_VALUE((SELECT name FROM UNNEST(p.categories) WHERE taxonomy = "a") IGNORE NULLS) OVER (PARTITION BY p.product ORDER BY p.date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS category_a,
LAST_VALUE((SELECT name FROM UNNEST(p.categories) WHERE taxonomy = "b") IGNORE NULLS) OVER (PARTITION BY p.product ORDER BY p.date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS category_b
FROM
products AS p
I can also get this outcome if I broke the query into two (one per category taxonomy) and cross join the "exploded" categories
into the original products
view, but is there a way that I can get this in one query?
CodePudding user response:
Given your df:
from datetime import date
import json
df = spark.read.json(spark.sparkContext.parallelize([
{'product': 1, 'date': '2020-01-01', 'brand': 'b1', 'categories': [{'name': 'ca1', 'taxonomy': 'a'}, {'name': 'cb1', 'taxonomy': 'b'}]},
{'product': 2, 'date': '2020-01-01', 'brand': 'b3', 'categories': [{'name': 'ca3', 'taxonomy': 'a'}]},
{'product': 1, 'date': '2020-01-02', 'brand': 'b2', 'categories': [{'name': 'ca2', 'taxonomy': 'a'}, {'name': 'cb2', 'taxonomy': 'b'}]},
{'product': 1, 'date': '2020-01-03', 'brand': None, 'categories': None}
]).map(json.dumps))
df.show(truncate=False)
----- -------------------- ---------- -------
|brand|categories |date |product|
----- -------------------- ---------- -------
|b1 |[{ca1, a}, {cb1, b}]|2020-01-01|1 |
|b3 |[{ca3, a}] |2020-01-01|2 |
|b2 |[{ca2, a}, {cb2, b}]|2020-01-02|1 |
|null |null |2020-01-03|1 |
----- -------------------- ---------- -------
you can use the following SQL
spark.sql("""
with df_with_maps as (
select product, brand, date, map_from_arrays(categories.taxonomy, categories.name) as category_map from df
)
select DISTINCT(product),
last(brand, true) over(PARTITION by product order by date asc rows BETWEEN UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as brand,
last(category_map['a'], true) over(PARTITION by product order by date asc rows BETWEEN UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as category_a,
last(category_map['b'], true) over(PARTITION by product order by date asc rows BETWEEN UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as category_b
from df_with_maps
""").show(truncate=False)
------- ----- ---------- ----------
|product|brand|category_a|category_b|
------- ----- ---------- ----------
|1 |b2 |ca2 |cb2 |
|2 |b3 |ca3 |null |
------- ----- ---------- ----------
this can obviously be written with the python API as well, i just assume you prefer the raw SQL