Home > Net >  Rearrange Array of Struct to Array of a Struct with a field as Array in Pyspark
Rearrange Array of Struct to Array of a Struct with a field as Array in Pyspark

Time:11-12

I have a "simple" dataframe array of struct(nome,h_0,h_1,....h_23) and I want rearrange this column as array of struct(nome, array(h_0,h_1....h_23))

as-is:

root
 |-- array_area: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- nome: string (nullable = true)
 |    |    |-- h_0: string (nullable = true)
 |    |    |-- h_1: string (nullable = true)
 |    |    |-- h_10: string (nullable = true)
 |    |    |-- h_11: string (nullable = true)
 |    |    |-- h_12: string (nullable = true)
 |    |    |-- h_13: string (nullable = true)
 |    |    |-- h_14: string (nullable = true)
 |    |    |-- h_15: string (nullable = true)
 |    |    |-- h_16: string (nullable = true)
 |    |    |-- h_17: string (nullable = true)
 |    |    |-- h_18: string (nullable = true)
 |    |    |-- h_19: string (nullable = true)
 |    |    |-- h_2: string (nullable = true)
 |    |    |-- h_20: string (nullable = true)
 |    |    |-- h_21: string (nullable = true)
 |    |    |-- h_22: string (nullable = true)
 |    |    |-- h_23: string (nullable = true)
 |    |    |-- h_3: string (nullable = true)
 |    |    |-- h_4: string (nullable = true)
 |    |    |-- h_5: string (nullable = true)
 |    |    |-- h_6: string (nullable = true)
 |    |    |-- h_7: string (nullable = true)
 |    |    |-- h_8: string (nullable = true)
 |    |    |-- h_9: string (nullable = true)

I want:

root
 |-- array_area: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- nome: string (nullable = true)
 |    |    |-- circadiana: array (nullable = true)
 |    |    |     |--element: struct (containsNull = true)
 |    |    |     |      |--h_0: string (nullable = true)
 |    |    |     |      |--h_1: string (nullable = true)
 |    |    |     |      |--h_2: string (nullable = true)
 |    |    |     |      |--... until h_23

I've used UDF like:

concat_udf = F.udf(lambda arr: F.array(F.struct(x["nome"], F.array(x["h_0"],x["h_1"],x["h_2"],x["h_3"],x["h_4"],x["h_5"],x["h_6"],x["h_7"],x["h_8"],x["h_9"],x["h_10"],x["h_11"],x["h_12"],x["h_13"],x["h_14"],x["h_15"],x["h_16"],x["h_17"],x["h_18"],x["h_19"],x["h_20"],x["h_21"],x["h_22"],x["h_23"])) for x in arr),
ArrayType(StructType([StructField("nome", StringType(), True),StructField("circadiana", ArrayType(StringType()), True)])))

printSchema is ok! but when I view data with show()

df_new=df.withColumn("area_provenienza_X",concat_udf(F.col("array_area"))).show()

I have this error:

  File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/worker.py", line 83, in <lambda>
    return lambda *a: toInternal(f(*a))
  File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "LoadFileSIMO112_dati_aggregati.py", line 150, in <lambda>
    x["h_23"])) for x in arr),
  File "/opt/giotto/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1869, in array
    jc = sc._jvm.functions.array(_to_seq(sc, cols, _to_java_column))
AttributeError: 'NoneType' object has no attribute '_jvm'

Example Data:

"area": [{
          "nome": "Extra",
          "h_0": "0",
          "h_1": "0",
          "h_2": "0",
          "h_3": "0",
          "h_4": "0",
          "h_5": "0",
          "h_6": "1",
          "h_7": "0",
          "h_8": "0",
          "h_9": "0",
          "h_10": "1",
          "h_11": "1",
          "h_12": "0",
          "h_13": "1",
          "h_14": "0",
          "h_15": "0",
          "h_16": "0",
          "h_17": "1",
          "h_18": "0",
          "h_19": "1",
          "h_20": "0",
          "h_21": "1",
          "h_22": "0",
          "h_23": "1"
        },
        {
          "nome": "ROMA CAP",
          "h_0": "130",
          "h_1": "94",
          "h_2": "116",
          "h_3": "61",
          "h_4": "54",
          "h_5": "47",
          "h_6": "58",
          "h_7": "57",
          "h_8": "87",
          "h_9": "0",
          "h_10": "0",
          "h_11": "0",
          "h_12": "0",
          "h_13": "0",
          "h_14": "0",
          "h_15": "0",
          "h_16": "0",
          "h_17": "0",
          "h_18": "0",
          "h_19": "0",
          "h_20": "0",
          "h_21": "0",
          "h_22": "0",
          "h_23": "124"
        }]

I want:

"area": [{
          "nome": "Extra",
          "circadiana":[0,0,0,0,0,0,1,0,0,0,1,1,0,1,0,0,0,1,0,1,0,1,0,1]
        },
        {
          "nome": "ROMA CAP",
          "circadiana":[130,94,116,61,54,47,58,87,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,124]
        }]

But there isn't Null Values in my dataframe (df). Thanks for help Regards

CodePudding user response:

You can use spark built-in transform function to convert each element of the array into the desired struct.

First, get all h_x field names present in the structs:

import pyspark.sql.functions as F

h_fields = [c for c in df.select(F.expr("inline(area)")).columns if c != "nome"]

Then, using transform function on area array, for each element, we create a struct with 2 fields. First field holds nome and the second field is an array circadiana created using all the others fields (h_0...):

transform_expr = f"""
    transform(area, 
              x -> struct(
                        x.nome as nome, 
                        array({','.join([f'x.{c}' for c in h_fields])}) as circadiana
                   )
    )
"""

df1 = df.withColumn("area", F.expr(transform_expr))

df1.printSchema()

#root
# |-- area: array (nullable = true)
# |    |-- element: struct (containsNull = false)
# |    |    |-- nome: string (nullable = true)
# |    |    |-- circadiana: array (nullable = false)
# |    |    |    |-- element: string (containsNull = true)

If the list of h_x fields is fixed you can actually use:

transform_expr = f"""
   transform(area, 
          x -> struct(
                    x.nome as nome, 
                    array({','.join([f'x.h_{i}' for i in range(24)])}) as circadiana
               )
   )
"""
  • Related