Consider a code:
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
class Scratch {
public static void main(String[] args) {
StreamTableEnvironment tableEnv = /*some init code here*/;
tableEnv.executeSql("CREATE TABLE my_table (\n"
" id STRING,\n"
" createdDate DATE,\n"
" `date` STRING "
" ) PARTITIONED BY (`date`) \n"
" WITH (\n"
" 'connector' = 'filesystem',\n"
" 'path' = 's3://my-bucket/',\n"
" 'format' = 'json'\n"
" )");
tableEnv.executeSql("CREATE TABLE output_table (\n"
" id STRING,\n"
" created_date DATE,\n"
" count_value BIGINT,\n"
" PRIMARY KEY (id, created_date) NOT ENFORCED\n"
") WITH (\n"
" 'connector' = 'filesystem', \n"
" 'path' = 's3://some-bucket/output-table/',\n"
" 'format' = 'json'\n"
" )");
Table temp = tableEnv.sqlQuery(
" SELECT id as id, "
" max(createdDate) as created_date, "
" COUNT(DISTINCT(id)) as count_value "
" from my_table\n"
" GROUP BY createdDate, id"
);
temp.executeInsert("output_table");
}
}
This will give me error:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Table sink 'default_catalog.default_database.output_table' doesn't support consuming update changes which is produced by node GroupAggregate(select=[MIN($f0) AS id, MAX(createdDate) AS created_date, COUNT(DISTINCT $f2) AS count_value ])
Is there a way to write aggregation to s3 via flink? (a flink is run in batch mode)
CodePudding user response:
As it is you are running the query in streaming mode, which requires a sink that can handle the updates and deletes coming from the aggregation.
This will work if you either
- produce the results in a CDC (changelog) format, such as debezium,
- or run the job in batch mode
To run in batch mode, you can do this:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
If you need to use the Table API in batch execution mode while also having access to the DataStream API, this is only possible since Flink 1.14.