Home > Software engineering >  Join nested dataframes Spark Scala
Join nested dataframes Spark Scala

Time:09-14

I have the following dataframes:

Dataframe1:

 ---- ------------------------------------------------------------------ 
|id  |records                                                           |
 ---- ------------------------------------------------------------------ 
|473 |[{1, 8414932001257, 34301, 70.0}, {2, 015878529935, 34301, 140.0}]|
|1529|[{1, 54490802, 34301, 70.0}]                                      |
|2052|[{1, 016229901172, 34301, 70.0}, {1, 8410793206138, 34304, 40.0}] |
|894 |[{1, 8429359001223, 34301, 70.0}]                                 |                                                                                                                                                                                                                                                                                                                                                  |
|2053|[{1, 8480012007242, 34304, 40.0}, {1, 8420030011050, 34301, 70.0}]|
 ---- ------------------------------------------------------------------ 

Whose schema is:

StructType(
    StructField(id,LongType,true), 
    StructField(records,ArrayType(
        StructType(
            StructField(count,LongType,true), 
            StructField(barcode,StringType,true), 
            StructField(height,LongType,true), 
            StructField(weight,DoubleType,true)),
        true),
    true)
)

Dataframe2:

 ------------ -------- 
|     barcode|itemType|
 ------------ -------- 
|015878529935|Box     |
|015878539989|Box     |
|016229901141|Can     |
|016229901172|Box     |
|016229901189|Can     |
 ------------ -------- 

Whose schema is:

StructType(
    StructField(barcode,StringType,true), 
    StructField(itemType,StringType,true)
)

I would like to join the two dataframes by barcode in such a way that Dataframe1 has the itemType column of Dataframe2, so I would have a schema like:

StructType(
    StructField(id,LongType,true), 
    StructField(records,ArrayType(
        StructType(
            StructField(count,LongType,true), 
            StructField(barcode,StringType,true), 
            StructField(material,LongType,true), 
            StructField(weight,DoubleType,true)),
            StructField(itemType,StringType,true)),
        true),
    true)
)

I have tried the following code:

dataframe1.join(dataframe2, dataframe1("records.barcode") === dataframe2("barcode"), "leftouter")

Result:
AnalysisException: cannot resolve '(spark_catalog.database.table1.`records`.`barcode` = spark_catalog.database.table2.`barcode`)' due to data type mismatch: differing types in '(spark_catalog.database.table1.`records`.`barcode` = spark_catalog.database.table2.`barcode`)' (array<string> and string).;

But logically it fails because the records column is an Array of StructType and barcode is a StringType. My problem is that I don't know how to access the records.barcode level of each element of the Array.

CodePudding user response:

You code is very good.. you just have to make small change in your code to work as expected.

dataframe1
.withColumn("records", explode_outer($"records")) 
// small change you have to make here.. because records is an array column .. to access barcode you have to explode array then join with other data frame
.join(
     dataframe2, 
     dataframe1("records.barcode") === dataframe2("barcode"), 
    "leftouter"
)
  • Related