I have a column in a pyspark df that contains an array of maps like the below:
[{"address": "Fadden", "city": "", "country": "", "note": "", "stateProvince": "Queensland"}]
df.printSchema()
returns the following for the column:
|-- constituencies: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- address: string (nullable = true)
| | |-- city: string (nullable = true)
| | |-- country: string (nullable = true)
| | |-- note: string (nullable = true)
| | |-- stateProvince: string (nullable = true)
And I want to nullify all those empty strings. So I thought this would be a perfect problem to solve with F.transform(col, f)
So I created the function, and then I use it in the transform expression like below:
def nullify_vals(d):
def nullify_string(str_):
if str_.strip() == "":
return None
return str_.strip()
return (
dict((k, nullify_string(v)) for k, v in d.items())
)
Note that the above works when tested on a dictionary:
dd = {"my": "map", "is": "", "not": " ", "entierly": " empty , right?"}
d_cln = nullify_vals(dd)
d_cln["not"] is None # returns True
But when I then use it in Pyspark, it gives me an error:
import pyspark.sql.functions as F
result = kyclean.select(F.transform("constituencies", nullify_vals))
TypeError: 'Column' object is not callable
These are the last lines of the stacktrace:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
File <command-899394298900126>:1, in <module>
----> 1 result = kyclean.select(F.transform("constituencies", nullify_vals))
File /databricks/spark/python/pyspark/sql/functions.py:4260, in transform(col, f)
4214 def transform(col, f):
4215 """
4216 Returns an array of elements after applying a transformation to each element in the input array.
4217
(...)
4258 --------------
4259 """
-> 4260 return _invoke_higher_order_function("ArrayTransform", [col], [f])
File /databricks/spark/python/pyspark/sql/functions.py:4209, in _invoke_higher_order_function(name, cols, funs)
4206 expr = getattr(expressions, name)
4208 jcols = [_to_java_column(col).expr() for col in cols]
-> 4209 jfuns = [_create_lambda(f) for f in funs]
4211 return Column(sc._jvm.Column(expr(*jcols jfuns)))
CodePudding user response:
Your function nullify_vals
should take a Column object of type StructType as your array elements are structs. But you're passing a normal python objects.
Try rewriting it like this instead:
from pyspark.sql import functions as F, Column
def nullify_vals(struct_col: Column, fields: List[str]) -> Column:
for f in fields:
struct_col = struct_col.withField(
f,
F.when(F.trim(struct_col[f]) == "", None).otherwise(struct_col[f])
)
return struct_col
For each field in the inner struct, we use column withField
method to update it, if it's equal to empty string then we set it to null.
Applied to your input example:
json_str = '{"constituencies":[{"address":"Fadden","city":"","country":"","note":"","stateProvince":"Queensland"}]}'
df = spark.read.json(spark.sparkContext.parallelize([json_str]))
You can get the list of constituencies
struct fields from dataframe schema:
constituencies_fields = df.selectExpr("inline(constituencies)").columns
df1 = df.withColumn(
"constituencies",
F.transform("constituencies", lambda x: nullify_vals(x, constituencies_fields))
)
df1.show(truncate=False)
# ----------------------------------------
#|constituencies |
# ----------------------------------------
#|[{Fadden, null, null, null, Queensland}]|
# ----------------------------------------
CodePudding user response:
I'm still looking into the error you got and I'll update the post when I figure out what's wrong. In the meantime, you can do something like this to work around it
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
schema = ArrayType(
StructType([
StructField('address', StringType()),
StructField('city', StringType()),
StructField('country', StringType()),
StructField('note', StringType()),
StructField('stateProvince', StringType()),
]), True)
nullify_udf = udf(lambda arr: [[(v if v.strip() != "" else None) for v in area] for area in arr], schema)
result = kyclean.withColumn('constituencies', nullify_udf('constituencies'))
The specific error you got is saying that you can't call d.items()
as a function and the input function really needs to be working on the Column object d
that gets passed in.
The description of pyspark.sql.functions.transform
says, "Returns an array of elements after applying a transformation to each element in the input array."
But inside the description of the accepted function, f
, it says, "...and can use methods of Column
, functions defined in pyspark.sql.functions
and Scala UserDefinedFunctions
. Python UserDefinedFunctions
are not supported (SPARK-27052)." So it can't take in custom Python UserDefinedFunctions
yet, which is sort of what you were trying to do.