Home > Enterprise >  Spark: How to transpose and explode columns with nested arrays
Spark: How to transpose and explode columns with nested arrays

Time:10-03

I applied an algorithm from the question below(in NOTE) to transpose and explode nested spark dataframe.

When I define cols = ['a', 'b'] I get empty dataframe, but when I define cols = ['a'] I get transformed dataframe for the a column. See section Current code below for more details. Any help would be appreciated.

I'm looking for required output 2 (Transpose and Explode ) but even example of required output 1 (Transpose) will be very useful.

NOTE: This is minimum example to highlight the problem, in reality dataframe schema and arrays length vary as in the example Pyspark: How to flatten nested arrays by merging values in spark

Input df:

 --- ------------------ -------- 
| id|                 a|       b|
 --- ------------------ -------- 
|  1|[{1, 1}, {11, 11}]|    null|
|  2|              null|[{2, 2}]|
 --- ------------------ -------- 


root
 |-- id: long (nullable = true)
 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true

Required output 1 (transpose_df):

 --- ------ ------------------- 
| id| cols |       arrays      |
 --- ------ ------------------- 
|  1|  a   | [{1, 1}, {11, 11}]|
|  2|  b   | [{2, 2}]          |
 --- ------ ------------------- 

Required output 2 (explode_df):

 --- ---- ---- --- 
| id|cols|date|val|
 --- ---- ---- --- 
|  1|   a|   1|  1|
|  1|   a|  11| 11|
|  2|   b|   2|  2|
 --- ---- ---- --- 

Current code:

import pyspark.sql.functions as f

df = spark.read.json(sc.parallelize([
  """{"id":1,"a":[{"date":1,"val":1},{"date":11,"val":11}]}""",
  """{"id":2,"b":[{"date":2,"val":2}]}}"""]))

cols = ['a', 'b']

expressions = [f.expr('TRANSFORM({col}, el -> STRUCT("{col}" AS cols, el.date, el.val))'.format(col=col)) for col in cols ]

transpose_df = df.withColumn('arrays', f.flatten(f.array(*expressions)))
             
explode_df = transpose_df.selectExpr('id', 'inline(arrays)')

explode_df.show()

Current Outcome

 --- ---- ---- --- 
| id|cols|date|val|
 --- ---- ---- --- 
 --- ---- ---- --- 

CodePudding user response:

stack might be a better option than transpose for the first step.


expr = f"stack({len(cols)},"   \
    ",".join([f"'{c}',{c}" for c in cols])   \
    ")"
#expr = stack(2,'a',a,'b',b)

transpose_df = df.selectExpr("id", expr) \
    .withColumnRenamed("col0", "cols") \
    .withColumnRenamed("col1", "arrays") \
    .filter("not arrays is null")

explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')
  • Related