Home > Software design >  Flink: RowRowConverter seems to fail for nested DataTypes
Flink: RowRowConverter seems to fail for nested DataTypes

Time:03-08

I am trying to load a complex JSON file (multiple different data types, nested objects/arrays etc) from my local, read them in as a source using the enter image description here

However, for some strange reason, RowRowConverter::toInternal runs twice, and if I continue stepping through eventually it will come back here, which is where the null pointer exception happens.

enter image description here

Example of the JSON (simplified with only a single nested for brevity). I placed it in my /src/main/resources folder

{"discount":[670237.997082,634079.372133,303534.821218]}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import org.apache.flink.table.data.conversion.RowRowConverter
import org.apache.flink.table.types.FieldsDataType
import org.apache.flink.table.types.logical.RowType

import scala.collection.JavaConverters._

object ReadJsonNestedData {
  def main(args: Array[String]): Unit = {
    // setup
    val jsonResource = getClass.getResource("/NESTED.json")
    val jsonFilePath = jsonResource.getPath
    val tableName = "orders"
    val readJSONTable =
      s"""
         | CREATE TABLE $tableName (
         |  `discount` ARRAY<DECIMAL(12, 6)>
         | )WITH (
         |    'connector' = 'filesystem',
         |    'path' = '$jsonFilePath',
         |    'format' = 'json'
         |)""".stripMargin

    val colFields = Array(
      "discount"
    )

    val defaultDataTypes = Array(
      DataTypes.ARRAY(DataTypes.DECIMAL(12, 6))
    )

    val rowType = RowType.of(defaultDataTypes.map(_.getLogicalType), colFields)
    val defaultDataTypesAsList = defaultDataTypes.toList.asJava
    val dataType = new FieldsDataType(rowType, defaultDataTypesAsList)
    val rowConverter = RowRowConverter.create(dataType)

    // Job
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    val tableEnv = StreamTableEnvironment.create(env)
    tableEnv.executeSql(readJSONTable)
    val ordersTable = tableEnv.from(tableName)
    val dataStream = tableEnv
      .toDataStream(ordersTable)
      .map(row => rowConverter.toInternal(row))

    dataStream.print()
    env.execute()
  }

}

I would hence like to know:

  1. Why RowRowConverter is not working and how I can remedy it
  2. Why RowRowConverter::toInternal is running twice for the same Row .. which may be the cause of that NullPointerException
  3. If my method of instantiating and using the RowRowConverter is correct based on my code above.

Thank you!

Environment:

  • IntelliJ 2021.3.2 (Ultimate)
  • AdoptOpenJDK 1.8
  • Scala: 2.12.15
  • Flink: 1.13.5
  • Flink Libraries Used (for this example):
    • flink-table-api-java-bridge
    • flink-table-planner-blink
    • flink-clients
    • flink-json

CodePudding user response:

The first call of RowRowConverter::toInternal is an internal implementation for making a deep copy of the StreamRecord emitted by table source, which is independent from the converter in your map function. The reason of the NPE is that the RowRowConverter in the map function is not initialized by calling RowRowConverter::open. You can use RichMapFunction instead to invoke the RowRowConverter::open in RichMapFunction::open.

CodePudding user response:

Thank you to @renqs for the answer.

Here is the code, if anyone is interested.

class ConvertRowToRowDataMapFunction(fieldsDataType: FieldsDataType)
    extends RichMapFunction[Row, RowData] {
  private final val rowRowConverter = RowRowConverter.create(fieldsDataType)

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    rowRowConverter.open(this.getClass.getClassLoader)
  }

  override def map(row: Row): RowData =
    this.rowRowConverter.toInternal(row)
}

// at main function
// ... continue from previous
val dataStream = tableEnv
      .toDataStream(personsTable)
      .map(new ConvertRowToRowDataMapFunction(dataType))
  • Related