I have a question I was unable to solve when working with Scala Spark (or PySpark). How can we merge two fields that are arrays of structs of different fields.
For example, if I have schema like so:
df.printSchema()
root
|-- arrayOne: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: string (nullable = true)
| | |-- b: string (nullable = true)
| | |-- c: string (nullable = true)
| | |-- Q: string (nullable = true)
|-- ArrayTwo: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- x: string (nullable = true)
| | |-- y: string (nullable = true)
| | |-- z: string (nullable = true)
| | |-- Q: string (nullable = true)
Can I create a df of the following schema using UDF:
df.printSchema()
root
|-- arrayOne: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: string (nullable = true)
| | |-- b: string (nullable = true)
| | |-- c: string (nullable = true)
| | |-- Q: string (nullable = true)
|-- ArrayTwo: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- x: string (nullable = true)
| | |-- y: string (nullable = true)
| | |-- z: string (nullable = true)
| | |-- Q: string (nullable = true)
|-- ArrayThree: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: string (nullable = true)
| | |-- b: string (nullable = true)
| | |-- c: string (nullable = true)
| | |-- Q: string (nullable = true)
| | |-- x: string (nullable = true)
| | |-- y: string (nullable = true)
| | |-- z: string (nullable = true)
When a,b,c are not null, x,y,z are null and vice-versa, however when x,y,z are nulls there is Q that will be non-null and have the same value for both arrays.
The UDF is an important aspect here, as exploding (explode_outer
) both fields will be:
- Too expensive
- Resulting in repetition of the second array elements that would corrupt the fidelity of the data - because of the element Q.
Writing UDF in Pig Latin or even plain Map Reduce would be very easy, but for some reason it is very complicated in the Spark environment, for me at least.
What would be a way to write a UDF to concatenate the two arrays and create the new struct with superset of elements of the two different structs?
CodePudding user response:
Here's a sample test I did. I created 2 fields of Array(Struct()) - arr_struct1
and arr_struct2
. Using them, I created the new field arr_struct12
that has all elements of the previous 2 array-struct fields. I've retained all columns in the printSchema()
for a better understanding.
data_sdf. \
withColumn('arr_struct1', func.array(func.struct(func.col('a').alias('a'), func.col('b').alias('b'), func.col('c').alias('c')))). \
withColumn('arr_struct2', func.array(func.struct(func.col('e').alias('e'), func.col('f').alias('f')))). \
withColumn('struct1', func.col('arr_struct1')[0]). \
withColumn('struct2', func.col('arr_struct2')[0]). \
withColumn('arr_struct12', func.array(func.struct('struct1.*', 'struct2.*'))). \
printSchema()
# ignore columns a to g in the schema below
# root
# |-- a: long (nullable = true)
# |-- b: long (nullable = true)
# |-- c: long (nullable = true)
# |-- d: long (nullable = true)
# |-- e: long (nullable = true)
# |-- f: long (nullable = true)
# |-- g: long (nullable = true)
# |-- arr_struct1: array (nullable = false)
# | |-- element: struct (containsNull = false)
# | | |-- a: long (nullable = true)
# | | |-- b: long (nullable = true)
# | | |-- c: long (nullable = true)
# |-- arr_struct2: array (nullable = false)
# | |-- element: struct (containsNull = false)
# | | |-- e: long (nullable = true)
# | | |-- f: long (nullable = true)
# |-- vals1: struct (nullable = true)
# | |-- a: long (nullable = true)
# | |-- b: long (nullable = true)
# | |-- c: long (nullable = true)
# |-- vals2: struct (nullable = true)
# | |-- e: long (nullable = true)
# | |-- f: long (nullable = true)
# |-- arr_struct12: array (nullable = false)
# | |-- element: struct (containsNull = false)
# | | |-- a: long (nullable = true)
# | | |-- b: long (nullable = true)
# | | |-- c: long (nullable = true)
# | | |-- e: long (nullable = true)
# | | |-- f: long (nullable = true)
In case you'd like to specify which elements to keep, you can specify it using col('col_name.element_alias')
instead of the *
.
data_sdf. \
withColumn('arr_struct1', func.array(func.struct(func.col('a').alias('a'), func.col('b').alias('b'), func.col('c').alias('c')))). \
withColumn('arr_struct2', func.array(func.struct(func.col('e').alias('e'), func.col('f').alias('f')))). \
withColumn('struct1', func.col('arr_struct1')[0]). \
withColumn('struct2', func.col('arr_struct2')[0]). \
withColumn('arr_struct12',
func.array(func.struct(func.col('struct1.a').alias('a'),
func.col('struct1.b').alias('b'),
func.col('struct2.f').alias('f')
)
)
). \
printSchema()
# ignore columns a to g in the schema below
# root
# |-- a: long (nullable = true)
# |-- b: long (nullable = true)
# |-- c: long (nullable = true)
# |-- d: long (nullable = true)
# |-- e: long (nullable = true)
# |-- f: long (nullable = true)
# |-- g: long (nullable = true)
# |-- arr_struct1: array (nullable = false)
# | |-- element: struct (containsNull = false)
# | | |-- a: long (nullable = true)
# | | |-- b: long (nullable = true)
# | | |-- c: long (nullable = true)
# |-- arr_struct2: array (nullable = false)
# | |-- element: struct (containsNull = false)
# | | |-- e: long (nullable = true)
# | | |-- f: long (nullable = true)
# |-- struct1: struct (nullable = true)
# | |-- a: long (nullable = true)
# | |-- b: long (nullable = true)
# | |-- c: long (nullable = true)
# |-- struct2: struct (nullable = true)
# | |-- e: long (nullable = true)
# | |-- f: long (nullable = true)
# |-- arr_struct12: array (nullable = false)
# | |-- element: struct (containsNull = false)
# | | |-- a: long (nullable = true)
# | | |-- b: long (nullable = true)
# | | |-- f: long (nullable = true)
CodePudding user response:
I will share below the solution that worked for me. Solution is a simple UDF that takes two arrays of structs as input, and creates a sequence of new struct that supersets the fields of the two structs as required:
case class ItemOne(a: String,
b: String,
c: String,
Q: String)
case class ItemTwo(x: String,
y: String,
z: String,
Q: String)
case class ItemThree(a: String,
b: String,
c: String,
x: String,
y: String,
z: String,
Q: String)
val combineAuctionData = udf((arrayOne: Seq[Row], arrayTwo: Seq[Row]) => {
val result = new ListBuffer[ItemThree]()
// For loop over list of ItemOne and get all ItemThree
for (el <- arrayOne) {
result = ItemThree(el.getString(0),
el.getString(1),
el.getString(2),
None,
None,
None,
el.getString(3))
}
// For loop over list of ItemTwo and get all ItemThree
for (el <- arrayTwo) {
result = ItemThree(None,
None,
None,
el.getString(0),
el.getString(1),
el.getString(2),
el.getString(3))
}
// Return List inheriting Seq of ItemThree's
result.toSeq
}: Seq[ItemThree])