I have two columns in a Spark scala DataFrame where each is an Array[Struct]
.
val arrayStructData = Seq(
Row("James",List(Row("Java","X",120)),List(Row("Scala","A",300))),
Row("Michael",List(Row("Java","Y",200)),List(Row("Scala","B",500))),
Row("Robert",List(Row("Java","Z",400)),List(Row("Scala","C",250)))
)
val arrayStructSchema = new StructType().add("name",StringType).
add("col1",ArrayType(new StructType().
add("name",StringType).
add("author",StringType).
add("pages",IntegerType))).
add("col2",ArrayType(new StructType().
add("name",StringType).
add("author",StringType).
add("pages",IntegerType)))
val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)
df.printSchema()
df.show(false)
This is my input dataframe df
:
------- ---------------- -----------------
|name |col1 |col2 |
------- ---------------- -----------------
|James |[[Java, X, 120]]|[[Scala, A, 300]]|
|Michael|[[Java, Y, 200]]|[[Scala, B, 500]]|
|Robert |[[Java, Z, 400]]|[[Scala, C, 250]]|
------- ---------------- -----------------
I am trying to concatenate the two columns into one. The output should look like this:
------- ---------------------------------
|name |col_merged |
------- ---------------------------------
|James |[[Java, X, 120], [Scala, A, 300]]|
|Michael|[[Java, Y, 200], [Scala, B, 500]]|
|Robert |[[Java, Z, 400], [Scala, C, 250]]|
------- ---------------------------------
I found a similar post here: Merge two columns of type Array[string] into a new Array[string] column, but this is more about Array[String]
, whereas my question is about an Array[Struct]
and appending to the Array instead of merging. concat
and array_union
are not working because of different data types of these columns.
Been stuck on this problem for a while now. Would highly appreciate any help!
CodePudding user response:
I've tested following in Databricks with Scala 2.12, Python 3.7 and Spark 3.2:
from pyspark.sql.types import StructType, StringType, ArrayType, IntegerType
import pyspark.sql.functions as f
arrayStructData = [
("James", [("Java","X",120)], [("Scala","A",300)]),
("Michael", [("Java","Y",200)], [("Scala","B",500)]),
("Robert", [("Java","Z",400)], [("Scala","C",250)])
]
arrayStructSchema = StructType(). \
add("name", StringType()). \
add("col1", ArrayType(StructType(). \
add("name", StringType()). \
add("author", StringType()). \
add("pages", IntegerType()))). \
add("col2", ArrayType(StructType(). \
add("name", StringType()). \
add("author", StringType()). \
add("pages", IntegerType())))
df = spark.createDataFrame(data=arrayStructData, schema=arrayStructSchema)
# df.printSchema()
# df.display()
df.withColumn("col_merged", f.concat("col1", "col2")).show(truncate=False)
Output:
------- ---------------- ----------------- ---------------------------------
|name |col1 |col2 |col_merged |
------- ---------------- ----------------- ---------------------------------
|James |[{Java, X, 120}]|[{Scala, A, 300}]|[{Java, X, 120}, {Scala, A, 300}]|
|Michael|[{Java, Y, 200}]|[{Scala, B, 500}]|[{Java, Y, 200}, {Scala, B, 500}]|
|Robert |[{Java, Z, 400}]|[{Scala, C, 250}]|[{Java, Z, 400}, {Scala, C, 250}]|
------- ---------------- ----------------- ---------------------------------
It merges array elements, even when there are multiple elements in array cols col1
and col2
:
arrayStructData = [
("James", [("Java","X",120), ("Java2","XX",130)], [("Scala","A",300), ("Scala2","AA",310)]),
("Michael", [("Java","Y",200)], [("Scala","B",500)]),
("Robert", [("Java","Z",400)], [("Scala","C",250)])
]
Output:
------- ---------------------------------- ------------------------------------ ----------------------------------------------------------------------
|name |col1 |col2 |col_merged |
------- ---------------------------------- ------------------------------------ ----------------------------------------------------------------------
|James |[{Java, X, 120}, {Java2, XX, 130}]|[{Scala, A, 300}, {Scala2, AA, 310}]|[{Java, X, 120}, {Java2, XX, 130}, {Scala, A, 300}, {Scala2, AA, 310}]|
|Michael|[{Java, Y, 200}] |[{Scala, B, 500}] |[{Java, Y, 200}, {Scala, B, 500}] |
|Robert |[{Java, Z, 400}] |[{Scala, C, 250}] |[{Java, Z, 400}, {Scala, C, 250}] |
------- ---------------------------------- ------------------------------------ ----------------------------------------------------------------------
CodePudding user response:
You can try to define a udf function to combine two columns.
import org.apache.spark.sql.functions._
val myUdf = udf((a:Seq[Row], b:Seq[Row]) => {
a b
}, ArrayType(StructType(new StructType().
add("name",StringType).
add("author",StringType).
add("pages",IntegerType))))
df.withColumn("col_merged", myUdf(col("col1"), col("col2"))).show(false)
the output
------- ---------------- ----------------- ---------------------------------
|name |col1 |col2 |col_merged |
------- ---------------- ----------------- ---------------------------------
|James |[[Java, X, 120]]|[[Scala, A, 300]]|[[Java, X, 120], [Scala, A, 300]]|
|Michael|[[Java, Y, 200]]|[[Scala, B, 500]]|[[Java, Y, 200], [Scala, B, 500]]|
|Robert |[[Java, Z, 400]]|[[Scala, C, 250]]|[[Java, Z, 400], [Scala, C, 250]]|
------- ---------------- ----------------- ---------------------------------