Home > Mobile >  Appending two columns of type Array[struct] into a single column in a Scala spark DataFrame
Appending two columns of type Array[struct] into a single column in a Scala spark DataFrame

Time:08-04

I have two columns in a Spark scala DataFrame where each is an Array[Struct].

val arrayStructData = Seq(
      Row("James",List(Row("Java","X",120)),List(Row("Scala","A",300))),
      Row("Michael",List(Row("Java","Y",200)),List(Row("Scala","B",500))),
      Row("Robert",List(Row("Java","Z",400)),List(Row("Scala","C",250)))
)

    val arrayStructSchema = new StructType().add("name",StringType).
        add("col1",ArrayType(new StructType().
        add("name",StringType).
        add("author",StringType).
        add("pages",IntegerType))).
        add("col2",ArrayType(new StructType().
        add("name",StringType).
        add("author",StringType).
        add("pages",IntegerType)))

    val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)
    df.printSchema()
    df.show(false)

This is my input dataframe df:

 ------- ---------------- ----------------- 
|name   |col1            |col2             |
 ------- ---------------- ----------------- 
|James  |[[Java, X, 120]]|[[Scala, A, 300]]|
|Michael|[[Java, Y, 200]]|[[Scala, B, 500]]|
|Robert |[[Java, Z, 400]]|[[Scala, C, 250]]|
 ------- ---------------- ----------------- 

I am trying to concatenate the two columns into one. The output should look like this:

 ------- --------------------------------- 
|name   |col_merged                       |
 ------- --------------------------------- 
|James  |[[Java, X, 120], [Scala, A, 300]]|
|Michael|[[Java, Y, 200], [Scala, B, 500]]|
|Robert |[[Java, Z, 400], [Scala, C, 250]]|
 ------- --------------------------------- 

I found a similar post here: Merge two columns of type Array[string] into a new Array[string] column, but this is more about Array[String], whereas my question is about an Array[Struct] and appending to the Array instead of merging. concat and array_union are not working because of different data types of these columns.

Been stuck on this problem for a while now. Would highly appreciate any help!

CodePudding user response:

I've tested following in Databricks with Scala 2.12, Python 3.7 and Spark 3.2:

from pyspark.sql.types import StructType, StringType, ArrayType, IntegerType
import pyspark.sql.functions as f

arrayStructData = [
  ("James", [("Java","X",120)], [("Scala","A",300)]),
  ("Michael", [("Java","Y",200)], [("Scala","B",500)]),
  ("Robert", [("Java","Z",400)], [("Scala","C",250)])
]

arrayStructSchema = StructType(). \
  add("name", StringType()). \
  add("col1", ArrayType(StructType(). \
    add("name", StringType()). \
    add("author", StringType()). \
    add("pages", IntegerType()))). \
  add("col2", ArrayType(StructType(). \
    add("name", StringType()). \
    add("author", StringType()). \
    add("pages", IntegerType())))

df = spark.createDataFrame(data=arrayStructData, schema=arrayStructSchema)
# df.printSchema()
# df.display()

df.withColumn("col_merged", f.concat("col1", "col2")).show(truncate=False)

Output:

 ------- ---------------- ----------------- --------------------------------- 
|name   |col1            |col2             |col_merged                       |
 ------- ---------------- ----------------- --------------------------------- 
|James  |[{Java, X, 120}]|[{Scala, A, 300}]|[{Java, X, 120}, {Scala, A, 300}]|
|Michael|[{Java, Y, 200}]|[{Scala, B, 500}]|[{Java, Y, 200}, {Scala, B, 500}]|
|Robert |[{Java, Z, 400}]|[{Scala, C, 250}]|[{Java, Z, 400}, {Scala, C, 250}]|
 ------- ---------------- ----------------- --------------------------------- 

It merges array elements, even when there are multiple elements in array cols col1 and col2:

arrayStructData = [
  ("James", [("Java","X",120), ("Java2","XX",130)], [("Scala","A",300), ("Scala2","AA",310)]),
  ("Michael", [("Java","Y",200)], [("Scala","B",500)]),
  ("Robert", [("Java","Z",400)], [("Scala","C",250)])
]

Output:

 ------- ---------------------------------- ------------------------------------ ---------------------------------------------------------------------- 
|name   |col1                              |col2                                |col_merged                                                            |
 ------- ---------------------------------- ------------------------------------ ---------------------------------------------------------------------- 
|James  |[{Java, X, 120}, {Java2, XX, 130}]|[{Scala, A, 300}, {Scala2, AA, 310}]|[{Java, X, 120}, {Java2, XX, 130}, {Scala, A, 300}, {Scala2, AA, 310}]|
|Michael|[{Java, Y, 200}]                  |[{Scala, B, 500}]                   |[{Java, Y, 200}, {Scala, B, 500}]                                     |
|Robert |[{Java, Z, 400}]                  |[{Scala, C, 250}]                   |[{Java, Z, 400}, {Scala, C, 250}]                                     |
 ------- ---------------------------------- ------------------------------------ ---------------------------------------------------------------------- 

CodePudding user response:

You can try to define a udf function to combine two columns.

import org.apache.spark.sql.functions._
val myUdf = udf((a:Seq[Row], b:Seq[Row]) => {
      a    b
    }, ArrayType(StructType(new StructType().
      add("name",StringType).
      add("author",StringType).
      add("pages",IntegerType))))

df.withColumn("col_merged", myUdf(col("col1"), col("col2"))).show(false)

the output

 ------- ---------------- ----------------- --------------------------------- 
|name   |col1            |col2             |col_merged                       |
 ------- ---------------- ----------------- --------------------------------- 
|James  |[[Java, X, 120]]|[[Scala, A, 300]]|[[Java, X, 120], [Scala, A, 300]]|
|Michael|[[Java, Y, 200]]|[[Scala, B, 500]]|[[Java, Y, 200], [Scala, B, 500]]|
|Robert |[[Java, Z, 400]]|[[Scala, C, 250]]|[[Java, Z, 400], [Scala, C, 250]]|
 ------- ---------------- ----------------- --------------------------------- 
  • Related