Home > Mobile >  Apache Spark : Reference and transform a field within an array of struct
Apache Spark : Reference and transform a field within an array of struct

Time:01-19

Let's say that we have the following dataset :

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

case class Rec3(i: Long, j: Boolean)
case class Rec2(a: Int, b:Rec3, c: String, d: Int)
case class Rec1(x:Int, y:Option[Seq[Rec2]], z:Boolean, zz: String)

val df = Seq(Rec1(5, Some(Seq(Rec2(4, Rec3(3L, true), "2022-09-22 13:00:00", 3), Rec2(44, Rec3(33L, true), "2022-11-11 22:11:00", 3))), false, "2022-09-23 14:30:00"), Rec1(55, Some(Seq(Rec2(44, Rec3(33L, false), "2023-01-11 21:00:00", 33))), true, "2023-01-22 11:33:00")).toDF

df.show(false)
 --- ---------------------------------------------------------------------------------- ----- ------------------- 
|x  |y                                                                                 |z    |zz                 |
 --- ---------------------------------------------------------------------------------- ----- ------------------- 
|5  |[{4, {3, true}, 2022-09-22 13:00:00, 3}, {44, {33, true}, 2022-11-11 22:11:00, 3}]|false|2022-09-23 14:30:00|
|55 |[{44, {33, false}, 2023-01-11 21:00:00, 33}]                                      |true |2023-01-22 11:33:00|
 --- ---------------------------------------------------------------------------------- ----- ------------------- 

df.printSchema
root
 |-- x: integer (nullable = false)
 |-- y: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: integer (nullable = false)
 |    |    |-- b: struct (nullable = true)
 |    |    |    |-- i: long (nullable = false)
 |    |    |    |-- j: boolean (nullable = false)
 |    |    |-- c: string (nullable = true)
 |    |    |-- d: integer (nullable = false)
 |-- z: boolean (nullable = false)
 |-- zz: string (nullable = true)

And I want to convert only the field y.c into Timestamp. This is the schema i want to get :

root
 |-- x: integer (nullable = false)
 |-- y: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: integer (nullable = false)
 |    |    |-- b: struct (nullable = true)
 |    |    |    |-- i: long (nullable = false)
 |    |    |    |-- j: boolean (nullable = false)
 |    |    |-- c: timestamp (nullable = true)
 |    |    |-- d: integer (nullable = false)
 |-- z: boolean (nullable = false)
 |-- zz: string (nullable = true)

This is what i tried so far :

df.withColumn(
  "y",
  transform(
    col("y"),
    elem => elem.withField(
      "c",
      unix_timestamp(
        col("y.c"),
        "yyyy-MM-dd' 'HH:mm:ss"
      )
    )
  )
) 

org.apache.spark.sql.AnalysisException: cannot resolve 'unix_timestamp(y.c, 'yyyy-MM-dd' 'HH:mm:ss')' due to data type mismatch: argument 1 requires (string or date or timestamp or timestamp_ntz) type, however, 'y.c' is of array type.; 'Project [x#4, transform(y#5, lambdafunction(update_fields(lambda x_0#35, WithField(c, unix_timestamp(y#5.c, yyyy-MM-dd' 'HH:mm:ss, Some(Europe/Paris), false))), lambda x_0#35, false)) AS y#34, z#6, zz#7] - LocalRelation [x#4, y#5, z#6, zz#7]

Now it's clear for me that referencing any field within an array, will return an array of that field, like this :

df.select("y.c")
res6: org.apache.spark.sql.DataFrame = [c: array<string>]

But i cannot find any elegant way to reference and transform this kind of fields, any idea ?

CodePudding user response:

It was possible by using the reference of every element of the array defined in the lambda function, as the following :

val out = df
  .withColumn("y",transform(col("y"),
     elem => elem.withField("c",unix_timestamp(elem("c"),
     "yyyy-MM-dd' 'HH:mm:ss").cast(TimestampType))))

out.show(false)
 --- ---------------------------------------------------------------------------------- ----- ------------------- 
|x  |y                                                                                 |z    |zz                 |
 --- ---------------------------------------------------------------------------------- ----- ------------------- 
|5  |[{4, {3, true}, 2022-09-22 13:00:00, 3}, {44, {33, true}, 2022-11-11 22:11:00, 3}]|false|2022-09-23 14:30:00|
|55 |[{44, {33, false}, 2023-01-11 21:00:00, 33}]                                      |true |2023-01-22 11:33:00|
 --- ---------------------------------------------------------------------------------- ----- ------------------- 

out.printSchema
root
 |-- x: integer (nullable = false)
 |-- y: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: integer (nullable = false)
 |    |    |-- b: struct (nullable = true)
 |    |    |    |-- i: long (nullable = false)
 |    |    |    |-- j: boolean (nullable = false)
 |    |    |-- c: timestamp (nullable = true)
 |    |    |-- d: integer (nullable = false)
 |-- z: boolean (nullable = false)
 |-- zz: string (nullable = true)

So my error was referencing df("c") in the transform function, but i have to use elem("c") to transform each element of the array.

  • Related