I have the following code and outputs.
import org.apache.spark.sql.functions.{collect_list, struct}
import sqlContext.implicits._
val df = Seq(
("john", "tomato", 1.99),
("john", "carrot", 0.45),
("bill", "apple", 0.99),
("john", "banana", 1.29),
("bill", "taco", 2.59)
).toDF("name", "food", "price")
df.groupBy($"name")
.agg(collect_list(struct($"food", $"price")).as("foods"))
.show(false)
df.printSchema
Output and Schema:
---- ---------------------------------------------
|name|foods |
---- ---------------------------------------------
|john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
|bill|[[apple,0.99], [taco,2.59]] |
---- ---------------------------------------------
root
|-- name: string (nullable = true)
|-- foods: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- food: string (nullable = true)
| | |-- price: double (nullable = false)
I want to filter based on df("foods.price") > 1.00. How do I filter this to get the output below?
---- ---------------------------------------------
|name|foods |
---- ---------------------------------------------
|john|[[banana,1.29], [tomato,1.99]] |
|bill|[[[taco,2.59]] |
---- ---------------------------------------------
I have tried df.filter($"foods.food" > 1.00)
, but this does not work as I'm getting an error. Anything else I can try?
CodePudding user response:
you are trying to apply filter on an array, hence it will throw an error as the syntax will be wrong. You can apply filter on price before and then do transformation as needed.
val cf = df.filter("price > 1.0").groupBy($"name").agg(collect_list(struct($"food", $"price")).as("foods")