Home > Enterprise >  How write to s3 table sink in flink without update and delete changes error?
How write to s3 table sink in flink without update and delete changes error?

Time:10-07

Consider a code:

import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

class Scratch {
  public static void main(String[] args) {

    StreamTableEnvironment tableEnv = /*some init code here*/;

    tableEnv.executeSql("CREATE TABLE my_table (\n"  
        "                                  id STRING,\n"  
        "                                  createdDate DATE,\n"  
        "                                  `date` STRING "  
        "                                ) PARTITIONED BY (`date`) \n"  
        "                                WITH (\n"  
        "                                  'connector' = 'filesystem',\n"  
        "                                  'path' = 's3://my-bucket/',\n"  
        "                                  'format' = 'json'\n"  
        "                                )");

    tableEnv.executeSql("CREATE TABLE output_table  (\n"  
        "  id STRING,\n"  
        "  created_date DATE,\n"  
        "  count_value BIGINT,\n"  
        "  PRIMARY KEY (id, created_date) NOT ENFORCED\n"  
        ") WITH (\n"  
        "   'connector' = 'filesystem', \n"  
        "   'path' = 's3://some-bucket/output-table/',\n"  
        "   'format' = 'json'\n"  
        " )");
    Table temp = tableEnv.sqlQuery(
        " SELECT id as id, "  
            " max(createdDate) as created_date, "  
            " COUNT(DISTINCT(id)) as count_value  "  
            " from my_table\n"  
            "    GROUP BY createdDate, id"
    );
    temp.executeInsert("output_table");

  }
}

This will give me error:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Table sink 'default_catalog.default_database.output_table' doesn't support consuming update changes which is produced by node GroupAggregate(select=[MIN($f0) AS id, MAX(createdDate) AS created_date, COUNT(DISTINCT $f2) AS count_value ])

Is there a way to write aggregation to s3 via flink? (a flink is run in batch mode)

CodePudding user response:

As it is you are running the query in streaming mode, which requires a sink that can handle the updates and deletes coming from the aggregation.

This will work if you either

  • produce the results in a CDC (changelog) format, such as debezium,
  • or run the job in batch mode

To run in batch mode, you can do this:

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inBatchMode()
    .build();

TableEnvironment tEnv = TableEnvironment.create(settings);

If you need to use the Table API in batch execution mode while also having access to the DataStream API, this is only possible since Flink 1.14.

  • Related