Background: I'm trying to get an event-time temporal join working with two 'large(r)' datasets/tables that are read from a CSV-file (16K rows in left table, somewhat less in right table). Both tables are append-only tables, i.e. their datasources are currently CSV-files, but will become CDC changelogs emitted by Debezium over Pulsar.
I am using the fairly new SYSTEM_TIME AS OF
syntax.
The problem: join results are only partly correct, i.e. at the start (first 20% or so) of the execution of the query, rows of the left-side are not matched with rows from the right side, while in theory, they should. After a couple of seconds, there are more matches, and by the time the query ends, rows of the left side are getting matched/joined correctly with rows of the right side. Every time that I run the query it shows other results in terms of which rows are (not) matched.
Both datasets are not ordered by their respective event-times. They are ordered by their primary key. So it's really this case, only with more data.
In essence, the right side is a lookup-table that changes over time, and we're sure that for every left record there was a matching right record, as both were created in the originating database at /- the same instant. Ultimately our goal is a dynamic materialized view that contains the same data as when we'd join the 2 tables in the CDC-enabled source database (SQL Server).
Obviously, I want to achieve a correct join over the complete dataset as explained in the Flink docs
Unlike simple examples and Flink test-code with a small dataset of only a few rows (like here), a join of larger datasets does not yield correct results.
I suspect that, when the probing/left table starts flowing, the build/right table is not yet 'in memory' which means that left rows don't find a matching right row, while they should -- if the right table would have started flowing somewhat earlier. That's why the left join
returns null-values for the columns of the right table.
I've included my code:
@Slf4j(topic = "TO_FILE")
public class CsvTemporalJoinTest {
private final String emr01Ddl =
"CREATE TABLE EMR01\n"
"(\n"
" SRC_NO STRING,\n"
" JRD_ETT_NO STRING,\n"
" STT_DT DATE,\n"
" MGT_SLT_DT DATE,\n"
" ATM_CRT_DT DATE,\n"
" LTD_MDT_IC STRING,\n"
" CPN_ORG_NO STRING,\n"
" PTY_NO STRING,\n"
" REG_USER_CD STRING,\n"
" REG_TS TIMESTAMP,\n"
" MUT_USER_CD STRING,\n"
" MUT_TS TIMESTAMP(3),\n"
" WATERMARK FOR MUT_TS AS MUT_TS,\n"
" PRIMARY KEY (CPN_ORG_NO) NOT ENFORCED\n"
") WITH (\n"
" 'connector' = 'filesystem',\n"
" 'path' = '" getCsv1() "',\n"
" 'format' = 'csv'\n"
")";
private final String emr02Ddl =
"CREATE TABLE EMR02\n"
"(\n"
" CPN_ORG_NO STRING,\n"
" DSB_TX STRING,\n"
" REG_USER_CD STRING,\n"
" REG_TS TIMESTAMP,\n"
" MUT_USER_CD STRING,\n"
" MUT_TS TIMESTAMP(3),\n"
" WATERMARK FOR MUT_TS AS MUT_TS,\n"
" PRIMARY KEY (CPN_ORG_NO) NOT ENFORCED\n"
") WITH (\n"
" 'connector' = 'filesystem',\n"
" 'path' = '" getCsv2() "',\n"
" 'format' = 'csv'\n"
")";
@Test
public void testEventTimeTemporalJoin() throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
var tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql(emr01Ddl);
tableEnv.executeSql(emr02Ddl);
Table result = tableEnv.sqlQuery(""
"SELECT *"
" FROM EMR01"
" LEFT JOIN EMR02 FOR SYSTEM_TIME AS OF EMR01.MUT_TS"
" ON EMR01.CPN_ORG_NO = EMR02.CPN_ORG_NO");
tableEnv.toChangelogStream(result).addSink(new TestSink());
env.execute();
System.out.println("[Count]" TestSink.values.size());
//System.out.println("[Row 1]" TestSink.values.get(0));
//System.out.println("[Row 2]" TestSink.values.get(1));
AtomicInteger i = new AtomicInteger();
TestSink.values.listIterator().forEachRemaining(value -> log.info("[Row " i.incrementAndGet() " ]=" value));
}
private static class TestSink implements SinkFunction<Row> {
// must be static
public static final List<Row> values = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(Row value, SinkFunction.Context context) {
values.add(value);
}
}
String getCsv1() {
try {
return new ClassPathResource("/GBTEMR01.csv").getFile().getAbsolutePath();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
String getCsv2() {
try {
return new ClassPathResource("/GBTEMR02.csv").getFile().getAbsolutePath();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Is there a way to solve this? E.g. is there a way to FIRST load the right side into Flink state, and THEN start loading/streaming the left side? Would this be a good approach because this question begs: how much later? what is the time that the left side can start flowing?
We're using Flink 1.13.3.
CodePudding user response:
This sort of temporal/versioned join depends on having accurate watermarks. Flink relies on the watermarks to know which rows can safely be dropped from the state being maintained (because they can no longer affect the results).
The watermarking you've used indicates that the rows are ordered by MUT_TS
. Since this isn't true, the join isn't able to produce complete results.
To fix this, the watermarks should be defined with something like this
WATERMARK FOR MUT_TS AS MUT_TS - INTERVAL '2' MINUTE
where the interval indicates how much out-of-orderness needs to be accommodated.