I applied an algorithm from the question below(in NOTE) to transpose and explode nested spark dataframe.
When I define cols = ['a', 'b']
I get empty dataframe, but when I define cols = ['a']
I get transformed dataframe for the a
column. See section Current code below for more details. Any help would be appreciated.
I'm looking for required output 2 (Transpose and Explode ) but even example of required output 1 (Transpose) will be very useful.
NOTE: This is minimum example to highlight the problem, in reality dataframe schema and arrays length vary as in the example Pyspark: How to flatten nested arrays by merging values in spark
Input df:
--- ------------------ --------
| id| a| b|
--- ------------------ --------
| 1|[{1, 1}, {11, 11}]| null|
| 2| null|[{2, 2}]|
--- ------------------ --------
root
|-- id: long (nullable = true)
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true
Required output 1 (transpose_df):
--- ------ -------------------
| id| cols | arrays |
--- ------ -------------------
| 1| a | [{1, 1}, {11, 11}]|
| 2| b | [{2, 2}] |
--- ------ -------------------
Required output 2 (explode_df):
--- ---- ---- ---
| id|cols|date|val|
--- ---- ---- ---
| 1| a| 1| 1|
| 1| a| 11| 11|
| 2| b| 2| 2|
--- ---- ---- ---
Current code:
import pyspark.sql.functions as f
df = spark.read.json(sc.parallelize([
"""{"id":1,"a":[{"date":1,"val":1},{"date":11,"val":11}]}""",
"""{"id":2,"b":[{"date":2,"val":2}]}}"""]))
cols = ['a', 'b']
expressions = [f.expr('TRANSFORM({col}, el -> STRUCT("{col}" AS cols, el.date, el.val))'.format(col=col)) for col in cols ]
transpose_df = df.withColumn('arrays', f.flatten(f.array(*expressions)))
explode_df = transpose_df.selectExpr('id', 'inline(arrays)')
explode_df.show()
Current Outcome
--- ---- ---- ---
| id|cols|date|val|
--- ---- ---- ---
--- ---- ---- ---
CodePudding user response:
stack might be a better option than transpose
for the first step.
expr = f"stack({len(cols)}," \
",".join([f"'{c}',{c}" for c in cols]) \
")"
#expr = stack(2,'a',a,'b',b)
transpose_df = df.selectExpr("id", expr) \
.withColumnRenamed("col0", "cols") \
.withColumnRenamed("col1", "arrays") \
.filter("not arrays is null")
explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')