I have an arbitrary number of arrays of equal length in a PySpark DataFrame. I need to coalesce these, element by element, into a single list. The problem with coalesce is that it doesn't work by element, but rather selects the entire first non-null array. Any suggestions for how to accomplish this would be appreciated. Please see the test case below for an example of expected input and output:
def test_coalesce_elements():
"""
Test array coalescing on a per-element basis
"""
from pyspark.sql import SparkSession
import pyspark.sql.types as t
import pyspark.sql.functions as f
spark = SparkSession.builder.getOrCreate()
data = [
{
"a": [None, 1, None, None],
"b": [2, 3, None, None],
"c": [5, 6, 7, None],
}
]
schema = t.StructType([
t.StructField('a', t.ArrayType(t.IntegerType())),
t.StructField('b', t.ArrayType(t.IntegerType())),
t.StructField('c', t.ArrayType(t.IntegerType())),
])
df = spark.createDataFrame(data, schema)
# Inspect schema
df.printSchema()
# root
# | -- a: array(nullable=true)
# | | -- element: integer(containsNull=true)
# | -- b: array(nullable=true)
# | | -- element: integer(containsNull=true)
# | -- c: array(nullable=true)
# | | -- element: integer(containsNull=true)
# Inspect df values
df.show(truncate=False)
# --------------------- ------------------ ---------------
# |a |b |c |
# --------------------- ------------------ ---------------
# |[null, 1, null, null]|[2, 3, null, null]|[5, 6, 7, null]|
# --------------------- ------------------ ---------------
# This obviously does not work, but hopefully provides the general idea
# Remember: this will need to work with an arbitrary and dynamic set of columns
input_cols = ['a', 'b', 'c']
df = df.withColumn('d', f.coalesce(*[f.col(i) for i in input_cols]))
# This is the expected output I would like to see for the given inputs
assert df.collect()[0]['d'] == [2, 1, 7, None]
Thanks in advance for any ideas!
CodePudding user response:
Although it would be ideal, I am not sure if there is an elegant way to do this using only pyspark functions.
What I did is write a udf
that takes in an variable number of columns (using *args
, which you can read about here), and return an array of integers.
@f.udf(returnType=t.ArrayType(t.IntegerType()))
def get_array_non_null_first_element(*args):
data_array = [item for item in args]
array_lengths = [len(array) for array in data_array]
## check that all of the arrays have the same length
assert(len(set(array_lengths)) == 1)
## if they do, then you can set the array length
array_length = array_lengths[0]
first_value_array = []
for i in range(array_length):
element_array = [array[i] for array in data_array]
value = None
for x in element_array:
if x is not None:
value = x
break
else:
continue
first_value_array.append(value)
return first_value_array
Then create a new column d
by applying this udf to whichever columns you like:
df.withColumn("d", get_array_non_null_first_element(F.col('a'), F.col('b'), F.col('c'))).show()
-------------------- ------------------ --------------- ---------------
| a| b| c| d|
-------------------- ------------------ --------------- ---------------
|[null, 1, null, n...|[2, 3, null, null]|[5, 6, 7, null]|[2, 1, 7, null]|
-------------------- ------------------ --------------- ---------------