I start with the following table:
|date | first_cat | second_cat | price_change|
|:--------- | :--------- |: -------- | ----------:|
|30/05/2022 | old | test_2 | 0.94|
|31/08/2022 | old | test_3 | 1.24|
|30/05/2022 | old | test_2 | 0.90|
|31/08/2022 | old | test_3 | 1.44|
|30/05/2022 | new | test_1 | 1.94|
|30/06/2022 | new | test_4 | 0.54|
|31/07/2022 | new | test_5 | 1.94|
|30/06/2022 | new | test_4 | 0.96|
I want to proceed to calculate the product of price_change grouped by date
, first_cat
and second_cat
:
|date | first_cat | second_cat | price_aggr |
|:--------- | :--------- |: -------- | ----------:|
|30/05/2022 | old | test_2 | 0.94*0.9|
|31/08/2022 | old | test_3 | 1.24*1.44|
|30/05/2022 | new | test_1 | 1.94|
|30/06/2022 | new | test_4 | 0.54*0.96|
|31/07/2022 | new | test_5 | 1.94|
I did it with:
SELECT
date,
first_cat,
second_cat
array_join(collect_list(price_change), "*") as price_aggr
FROM my_table
GROUP BY
date,
first_cat,
second_cat
However using that results in having text expression in a table, whereas I would like to have evaluation of that expression, so the desired result is as follows:
|date | first_cat | second_cat | price_aggr |
|:--------- | :--------- |: -------- | ----------:|
|30/05/2022 | old | test_2 | 0.846|
|31/08/2022 | old | test_3 | 1.7856|
|30/05/2022 | new | test_1 | 1.94|
|30/06/2022 | new | test_4 | 0.5184|
|31/07/2022 | new | test_5 | 1.94|
I saw some ideas, but they use 'Pandas' and other methods that fall outside Spark SQL
:
Cumulative product in Spark
I need to do it in Spark SQL
alone, I'd like to avoid conversion to Pandas
and UDFs
.
Many thanks!
CodePudding user response:
You need to aggregate price_change
by multiplying all values within groups. With UDF and Dataframe API it's pretty straightforward:
val product = udf { pcs: Seq[Double] => pcs.reduce(_ * _) }
my_table.groupBy($"date", "$first_cat", $"second_cat")
.agg(product(collect_list($"price_change")).as("price_aggr"))
.show
You can go with SQL also:
val product = udf { pcs: Seq[Double] => pcs.reduce(_ * _) }
spark.udf.register("product", product)
spark.sql("""
SELECT date, first_cat, second_cat, product(collect_list(price_change))
FROM my_table
GROUP BY date, first_cat, second_cat
""").show
Well... if you strictly want to avoid UDFs and less care about readability this will work too:
SELECT date, first_cat, second_cat,
exp(sum(ln(price_change))) as price_aggr
FROM my_table
GROUP BY date, first_cat, second_cat;
It makes use of simple transformation - addition of natural logarithms of two numbers, then expotentiating them is equivalent to multiplication (->reading). It's not super readable and beware of potential precision loss - your choice.