I want to modify/filter on a property inside a struct. Let's say I have a dataframe with the following column :
# ------------------------------------------
#| arrayCol |
# ------------------------------------------
#| {"a" : "some_value", "b" : [1, 2, 3]} |
# ------------------------------------------
Schema:
struct<a:string, b:array<int>>
I want to filter out some values in 'b' property when value inside the array == 1
The result desired is the following :
# ------------------------------------------
#| arrayCol |
# ------------------------------------------
#| {"a" : "some_value", "b" : [2, 3]} |
# ------------------------------------------
Is it possible to do it without extracting the property, filter the values, and re-build another struct ?
CodePudding user response:
One way would be to define a UDF:
Example:
import ast
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, MapType
def remove_value(col):
col["b"] = str([x for x in ast.literal_eval(col["b"]) if x != 1])
return col
if __name__ == "__main__":
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[
{
"arrayCol": {
"a": "some_value",
"b": "[1, 2, 3]",
},
},
]
)
remove_value_udf = spark.udf.register(
"remove_value_udf", remove_value, MapType(StringType(), StringType())
)
df = df.withColumn(
"result",
remove_value_udf(F.col("arrayCol")),
)
Result:
root
|-- arrayCol: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- result: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
--------------------------------- ------------------------------
|arrayCol |result |
--------------------------------- ------------------------------
|{a -> some_value, b -> [1, 2, 3]}|{a -> some_value, b -> [2, 3]}|
--------------------------------- ------------------------------
CodePudding user response:
AFAIK, Spark doesn’t support adding/updating fields in nested structures. To update a struct column, you'll need to create a new struct using the existing fields and the updated ones.
In your case, you can update the struct column by using filter
function to filter b
field like this:
import pyspark.sql.functions as F
df1 = df.withColumn(
"arrayCol",
F.struct(
F.col("arrayCol.a").alias("a"),
F.expr("filter(arrayCol.b, x -> x != 1)").alias("b")
)
)
df1.show()
# --------------------
#| arrayCol|
# --------------------
#|{some_value, [2, 3]}|
# --------------------