I want to retain the elements which are present in a row of a ArrayType column. For instance, if a row of my spark dataframe is:
val df = sc.parallelize(Seq(Seq("A", "B"),Seq("C","X"),Seq("A", "B", "C", "D", "Z"))).toDF("column")
---------------
| column|
---------------
| [A, B]|
| [C, X]|
|[A, B, C, D, Z]|
---------------
and I have a dictionary_list = ["A", "Z", "X", "Y"]
, I want the row of my output dataframe to be:
val outp_df
---------------
| column|
---------------
| [A]|
| [X]|
| [A, Z]|
---------------
I tried array_contains
, array_overlap
, etc. but the output I'm getting is like this:
val result = df.where(array_contains(col("column"), "A"))
---------------
| column|
---------------
| [A, B]|
|[A, B, C, D, Z]|
---------------
The rows are getting filtered, but I want to filter inside the list/row itself. Any way I can do this?
CodePudding user response:
The result you're getting makes perfect sense, you are ONLY selecting rows, which their column array value contains "A", which is the first row and the last row. What you need is a function (NOT a SQL filter function) which receives an input sequence of string, and returns a sequence which contains only values that exist in your dictionary. You can use udf
s like this:
// rename this as you wish
val myCustomFilter: Seq[String] => Seq[String] =
input => input.filter(dictionaryList.contains)
// register to your spark context
// rename this as you wish
spark.udf.register("custom_filter", myCustomFilter)
And then, you need select operator, not a filter operator! a filter operator only selects row
s that can satisfy the predicate, this is not what you want.
Spark shell result:
scala> df.select(expr("custom_filter(column)")).show
--------------
|cusfil(column)|
--------------
| [A]|
| [X]|
| [A, Z]|
--------------
CodePudding user response:
Use filter
function:
val df1 = df.withColumn(
"column",
expr("filter(column, x -> x in ('A', 'Z', 'X', 'Y'))")
)