I have the same situation as stated in this question.
df = spark.createDataFrame(
[(1, "xx", [10, 20], ["a", "b"], ["p", "q"]),
(2, "yy", [30, 40], ["c", "d"], ["r", "s"]),
(3, "zz", None, ["f", "g"], ["e", "k"])],
["c1", "c2", "a1", "a2", "a3"])
df.show()
# --- --- -------- ------ ------
# | c1| c2| a1| a2| a3|
# --- --- -------- ------ ------
# | 1| xx|[10, 20]|[a, b]|[p, q]|
# | 2| yy|[30, 40]|[c, d]|[r, s]|
# | 3| zz| null|[f, g]|[e, k]|
# --- --- -------- ------ ------
I can't figure out a way to explode it correctly in PySpark. How I can achieve this result?
--- --- ---- --- ---
| c1| c2| 0| 1| 2|
--- --- ---- --- ---
| 1| xx| 10| a| p|
| 1| xx| 20| b| q|
| 2| yy| 30| c| r|
| 2| yy| 40| d| s|
| 3| zz|null| f| e|
| 3| zz|null| g| k|
--- --- ---- --- ---
CodePudding user response:
This should do it for dynamic number of array columns:
from pyspark.sql import functions as F
arr_cols = [c[0] for c in df.dtypes if c[1][:5] == "array"]
df = df.withColumn(
"arr_of_struct",
F.arrays_zip(*[F.coalesce(c, F.array(F.lit(None))) for c in arr_cols])
).select(
*[c for c in df.columns if c not in arr_cols],
F.expr("inline(arr_of_struct)")
)
df.show()
# --- --- ---- --- ---
# | c1| c2| 0| 1| 2|
# --- --- ---- --- ---
# | 1| xx| 10| a| p|
# | 1| xx| 20| b| q|
# | 2| yy| 30| c| r|
# | 2| yy| 40| d| s|
# | 3| zz|null| f| e|
# | 3| zz|null| g| k|
# --- --- ---- --- ---