Home > front end >  How to coalesce multiple pyspark arrays?
How to coalesce multiple pyspark arrays?

Time:01-25

I have an arbitrary number of arrays of equal length in a PySpark DataFrame. I need to coalesce these, element by element, into a single list. The problem with coalesce is that it doesn't work by element, but rather selects the entire first non-null array. Any suggestions for how to accomplish this would be appreciated. Please see the test case below for an example of expected input and output:

def test_coalesce_elements():
    """
    Test array coalescing on a per-element basis
    """
    from pyspark.sql import SparkSession
    import pyspark.sql.types as t
    import pyspark.sql.functions as f

    spark = SparkSession.builder.getOrCreate()

    data = [
        {
            "a": [None, 1, None, None],
            "b": [2, 3, None, None],
            "c": [5, 6, 7, None],
        }
    ]

    schema = t.StructType([
        t.StructField('a', t.ArrayType(t.IntegerType())),
        t.StructField('b', t.ArrayType(t.IntegerType())),
        t.StructField('c', t.ArrayType(t.IntegerType())),
    ])
    df = spark.createDataFrame(data, schema)

    # Inspect schema
    df.printSchema()
    # root
    # | -- a: array(nullable=true)
    # | | -- element: integer(containsNull=true)
    # | -- b: array(nullable=true)
    # | | -- element: integer(containsNull=true)
    # | -- c: array(nullable=true)
    # | | -- element: integer(containsNull=true)

    # Inspect df values
    df.show(truncate=False)
    #  --------------------- ------------------ --------------- 
    # |a                    |b                 |c              |
    #  --------------------- ------------------ --------------- 
    # |[null, 1, null, null]|[2, 3, null, null]|[5, 6, 7, null]|
    #  --------------------- ------------------ --------------- 

    # This obviously does not work, but hopefully provides the general idea
    # Remember: this will need to work with an arbitrary and dynamic set of columns
    input_cols = ['a', 'b', 'c']
    df = df.withColumn('d', f.coalesce(*[f.col(i) for i in input_cols]))

    # This is the expected output I would like to see for the given inputs
    assert df.collect()[0]['d'] == [2, 1, 7, None]

Thanks in advance for any ideas!

CodePudding user response:

Although it would be ideal, I am not sure if there is an elegant way to do this using only pyspark functions.

What I did is write a udf that takes in an variable number of columns (using *args, which you can read about here), and return an array of integers.

@f.udf(returnType=t.ArrayType(t.IntegerType()))
def get_array_non_null_first_element(*args):
    data_array = [item for item in args]
    array_lengths = [len(array) for array in data_array]
    
    ## check that all of the arrays have the same length
    assert(len(set(array_lengths)) == 1)
    
    ## if they do, then you can set the array length
    array_length = array_lengths[0]
    
    first_value_array = []
    for i in range(array_length):
        element_array = [array[i] for array in data_array]
        value = None
        for x in element_array:
            if x is not None:
                value = x
                break
            else:
                continue
        first_value_array.append(value)
    return first_value_array

Then create a new column d by applying this udf to whichever columns you like:

df.withColumn("d", get_array_non_null_first_element(F.col('a'), F.col('b'), F.col('c'))).show()

 -------------------- ------------------ --------------- --------------- 
|                   a|                 b|              c|              d|
 -------------------- ------------------ --------------- --------------- 
|[null, 1, null, n...|[2, 3, null, null]|[5, 6, 7, null]|[2, 1, 7, null]|
 -------------------- ------------------ --------------- --------------- 
  • Related