Home > Software engineering >  spark / scala : data type mismatch during nested values update
spark / scala : data type mismatch during nested values update

Time:07-29

I have an input dataframe like this :

 ---------------------------------------------------------------------  
|infos                                                                |
 ---------------------------------------------------------------------  
|[{100, 1, foo}, {103, 1, bar}, {99, 0, null}]                        | 
|[{101, 1, null}, {102, 1, null}]                                     | 
|[]                                                                   |
 --------------------------------------------------------------------- 

with this schema :

root
 |-- Infos: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- val: string (nullable = true)
 |    |    |-- com: string (nullable = true)

And i would like to replace null values with empty string:

  val nullToEmptyString: Row => Row = { row: Row =>
    def recursifUpdate(row: Any): Any = {
      row match {
        case row: Row      => Row.fromSeq(row.toSeq.map(recursifUpdate))
        case seq: Seq[Any] => seq.map(recursifUpdate)
        case null          => ""
        case _             => row
      }
    }
    Row.fromSeq(row.toSeq.map(recursifUpdate))
  }

val outputDataSchema: StructType = StructType(
   StructField("compInfos",
      ArrayType(
         StructType(
            Seq(
               StructField("id", StringType, nullable = true),
               StructField("value", StringType, nullable = true),
               StructField("text", StringType, nullable = true)
            )
         )
      ), nullable = false)
)

val outputDf = spark.createDataFrame(inputDf.rdd.map{nullToEmptyString}, outputDataSchema)
outputDf.show(false)
outputDf.coalesce(1).write.format("parquet").save("/usr/samples/output/")

The show works perfectly :

 ---------------------------------------------------------------------  
|infos                                                                |
 ---------------------------------------------------------------------  
|[{100, 1, foo}, {103, 1, bar}, {99, 0, }]                            | 
|[{101, 1, }, {102, 1, }]                                             | 
|[]                                                                   |
 --------------------------------------------------------------------- 

but when I try to write the outputDf, I get this data type mismatch error :

Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of array<struct < id:string, val:string, com:string> >

I don't know exactly why. is it a better way to rewrite the nullToEmptyString function in order to catch empty struct type ?

CodePudding user response:

SPECIFIC KEY

I am not really sure how to answer your question, but I can provide you another way to do this without UDFs which is better;

Assume your dataset is called main and looks as below

 --------------------------------- 
|col                              |
 --------------------------------- 
|[{null, 100, 100}, {foo, 100, 1}]|
|[{null, 100, 1}, {foo, 100, 2}]  |
 --------------------------------- 

with the following schema

root
 |-- col: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- com: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- val: string (nullable = true)

What we can do, is overwrite col by doing a transformation where we replace null with "", otherwise we leave the object as is. The following line help us do that:

main = main.withColumn("col", expr("transform(col, x -> case when (x.com is null) then named_struct(\"com\", \"\",\"id\",x.id,\"val\",x.val) else x end)"))

To elaborate on this, we are saying if x.com is null, then create a named struct with the same attributes but with a change, overwriting com with "".

The final output looks as below:

 ----------------------------- 
|col                          |
 ----------------------------- 
|[{, 100, 100}, {foo, 100, 1}]|
|[{, 100, 1}, {foo, 100, 2}]  |
 ----------------------------- 

which is I hope what you want! Good luck!

MULTIPLE KEYS

Assume we have this dataset:

 ------------------------------------ 
|col                                 |
 ------------------------------------ 
|[{1, 50, 100}, {2, null, 150}]      |
|[{null, null, null}, {2, null, 150}]|
 ------------------------------------ 

Now, we will create an array of keys that our JSON has, so we can map them to case whens, then concatenate to make one single expression:

val keys = Array("id", "val", "com").map(key => s"'$key', case when (x.$key is null) then '' else x.$key end").mkString(", ")

Then finally, we can put that into transform as follows:

df1 = df1.withColumn("correct", expr(s"transform(col, x -> named_struct($keys))"))

and we get:

 ------------------------------------ -------------------------- 
|col                                 |correct                   |
 ------------------------------------ -------------------------- 
|[{1, 50, 100}, {2, null, 150}]      |[{1, 100, 50}, {2, 150, }]|
|[{null, null, null}, {2, null, 150}]|[{, , }, {2, 150, }]      |
 ------------------------------------ -------------------------- 

which is I hope what you need!

  • Related