I have a below sample data frame, where I need to filter colA based on the contents of field colB
Schema for the Input
|-- colA: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: string (nullable = true)
| | |-- type: string (nullable = true)
|-- colB: array (nullable = true)
| |-- element: string (containsNull = true)
| colA | colB |
| ------------------------------------- | -------------- |
| [{ABC, Completed}, {DEF, Pending}] | [ABC, GHI] |
| [{ABC, Completed}, {GHI, Failure}] | [ABC, GHI] |
| [{ABC, Completed}, {DEF, Pending}] | [ABC] |
So, looking for below output
| colA | colB | colC
| ----------------------------------| -----------| ------
| [{ABC, Completed}, {DEF, Pending}]| [ABC, GHI] | [{ABC, Completed}]
| [{ABC, Completed}, {GHI, Failure}]| [ABC, GHI] | [{ABC, Completed}, {GHI, Failure}]
| [{ABC, Completed}, {DEF, Pending}]| [ABC] | [{ABC, Completed}]
I was able to figure out the logic using higher-order functions when the colB is a String. Below is a code snippet for it, need help to expand it to when colB is array of strings
inputDF
.withColumn(
"colC",
expr(
"filter(colA, colA_struct -> colA_struct.id == colB)"
)
)
CodePudding user response:
Use the array_contains
function to determine whether colB
contains colA.id
, and then use the filter
function to filter colA
, that is You can get colC
.
import pyspark.sql.functions as F
......
df = df.withColumn('colC', F.expr('filter(colA, x -> array_contains(colB, x.id))'))
df.show(truncate=False)