Home > Back-end >  Pyspark use sql.transform to nullify all empty strings in a column containing an array of structs
Pyspark use sql.transform to nullify all empty strings in a column containing an array of structs

Time:02-21

I have a column in a pyspark df that contains an array of maps like the below:

[{"address": "Fadden", "city": "", "country": "", "note": "", "stateProvince": "Queensland"}]

df.printSchema() returns the following for the column:

 |-- constituencies: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: string (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- note: string (nullable = true)
 |    |    |-- stateProvince: string (nullable = true)

And I want to nullify all those empty strings. So I thought this would be a perfect problem to solve with F.transform(col, f)

So I created the function, and then I use it in the transform expression like below:

def nullify_vals(d):
  def nullify_string(str_):
    if str_.strip() == "":
      return None
    return str_.strip()
  
  return (
    dict((k, nullify_string(v)) for k, v in d.items())  
  )

Note that the above works when tested on a dictionary:

dd = {"my": "map", "is": "", "not": "   ", "entierly": "  empty , right?"}
d_cln = nullify_vals(dd)  
d_cln["not"] is None # returns True

But when I then use it in Pyspark, it gives me an error:

import pyspark.sql.functions as F
result = kyclean.select(F.transform("constituencies", nullify_vals))

TypeError: 'Column' object is not callable

These are the last lines of the stacktrace:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File <command-899394298900126>:1, in <module>
----> 1 result = kyclean.select(F.transform("constituencies", nullify_vals))

File /databricks/spark/python/pyspark/sql/functions.py:4260, in transform(col, f)
   4214 def transform(col, f):
   4215     """
   4216     Returns an array of elements after applying a transformation to each element in the input array.
   4217 
   (...)
   4258      -------------- 
   4259     """
-> 4260     return _invoke_higher_order_function("ArrayTransform", [col], [f])

File /databricks/spark/python/pyspark/sql/functions.py:4209, in _invoke_higher_order_function(name, cols, funs)
   4206 expr = getattr(expressions, name)
   4208 jcols = [_to_java_column(col).expr() for col in cols]
-> 4209 jfuns = [_create_lambda(f) for f in funs]
   4211 return Column(sc._jvm.Column(expr(*jcols   jfuns)))

CodePudding user response:

Your function nullify_vals should take a Column object of type StructType as your array elements are structs. But you're passing a normal python objects.

Try rewriting it like this instead:

from pyspark.sql import functions as F, Column

def nullify_vals(struct_col: Column, fields: List[str]) -> Column:
    for f in fields:
        struct_col = struct_col.withField(
            f,
            F.when(F.trim(struct_col[f]) == "", None).otherwise(struct_col[f])
        )

    return struct_col

For each field in the inner struct, we use column withField method to update it, if it's equal to empty string then we set it to null.

Applied to your input example:

json_str = '{"constituencies":[{"address":"Fadden","city":"","country":"","note":"","stateProvince":"Queensland"}]}'
df = spark.read.json(spark.sparkContext.parallelize([json_str]))

You can get the list of constituencies struct fields from dataframe schema:

constituencies_fields = df.selectExpr("inline(constituencies)").columns

df1 = df.withColumn(
    "constituencies",
    F.transform("constituencies", lambda x: nullify_vals(x, constituencies_fields))
)

df1.show(truncate=False)
# ---------------------------------------- 
#|constituencies                          |
# ---------------------------------------- 
#|[{Fadden, null, null, null, Queensland}]|
# ---------------------------------------- 

CodePudding user response:

I'm still looking into the error you got and I'll update the post when I figure out what's wrong. In the meantime, you can do something like this to work around it

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = ArrayType(
    StructType([
      StructField('address', StringType()),
      StructField('city', StringType()),
      StructField('country', StringType()),
      StructField('note', StringType()),
      StructField('stateProvince', StringType()),
  ]), True)
nullify_udf = udf(lambda arr: [[(v if v.strip() != "" else None) for v in area] for area in arr], schema)

result = kyclean.withColumn('constituencies', nullify_udf('constituencies'))

The specific error you got is saying that you can't call d.items() as a function and the input function really needs to be working on the Column object d that gets passed in.

The description of pyspark.sql.functions.transform says, "Returns an array of elements after applying a transformation to each element in the input array."

But inside the description of the accepted function, f, it says, "...and can use methods of Column, functions defined in pyspark.sql.functions and Scala UserDefinedFunctions. Python UserDefinedFunctions are not supported (SPARK-27052)." So it can't take in custom Python UserDefinedFunctions yet, which is sort of what you were trying to do.

  • Related