I'm trying to implement an event time temporal join in Flink. Here's the first join table:
tEnv.executeSql("CREATE TABLE AggregatedTrafficData_Kafka ("
"`timestamp` TIMESTAMP_LTZ(3),"
"`area` STRING,"
"`networkEdge` STRING,"
"`vehiclesNumber` BIGINT,"
"`averageSpeed` INTEGER,"
"WATERMARK FOR `timestamp` AS `timestamp`"
") WITH ("
"'connector' = 'kafka',"
"'topic' = 'seneca.trafficdata.aggregated',"
"'properties.bootstrap.servers' = 'localhost:9092',"
"'properties.group.id' = 'traffic-data-aggregation-job',"
"'format' = 'json',"
"'json.timestamp-format.standard' = 'ISO-8601'"
")");
The table is used as a sink for the following query:
Table aggregatedTrafficData = trafficData
.window(Slide.over(lit(30).seconds())
.every(lit(15).seconds())
.on($("timestamp"))
.as("w"))
.groupBy($("w"), $("networkEdge"), $("area"))
.select(
$("w").end().as("timestamp"),
$("area"),
$("networkEdge"),
$("plate").count().as("vehiclesNumber"),
$("speed").avg().as("averageSpeed")
);
Here's the other join table. I use Debezium to stream a Postgres table into Kafka:
tEnv.executeSql("CREATE TABLE TransportNetworkEdge_Kafka ("
"`timestamp` TIMESTAMP_LTZ(3) METADATA FROM 'value.source.timestamp' VIRTUAL,"
"`urn` STRING,"
"`flow_rate` INTEGER,"
"PRIMARY KEY(`urn`) NOT ENFORCED,"
"WATERMARK FOR `timestamp` AS `timestamp`"
") WITH ("
"'connector' = 'kafka',"
"'topic' = 'seneca.network.transport_network_edge',"
"'scan.startup.mode' = 'latest-offset',"
"'properties.bootstrap.servers' = 'localhost:9092',"
"'properties.group.id' = 'traffic-data-aggregation-job',"
"'format' = 'debezium-json',"
"'debezium-json.schema-include' = 'true'"
")");
Finally here's the temporal join:
Table transportNetworkCongestion = tEnv.sqlQuery("SELECT AggregatedTrafficData_Kafka.`timestamp`, `networkEdge`, "
"congestion(`vehiclesNumber`, `flow_rate`) AS `congestion` FROM AggregatedTrafficData_Kafka "
"JOIN TransportNetworkEdge_Kafka FOR SYSTEM_TIME AS OF AggregatedTrafficData_Kafka.`timestamp` "
"ON AggregatedTrafficData_Kafka.`networkEdge` = TransportNetworkEdge_Kafka.`urn`");
The problem I'm having is that the join works only for the first few second (after an update in the Postgres table), but I need to continuosly join the first table with debezium one. Am I doing something wrong? Thanks euks
CodePudding user response:
Temporal joins using the AS OF syntax you're using require:
- an append-only table with a valid event-time attribute
- an updating table with a primary key and a valid event-time attribute
- an equality predicate on the primary key
When Flink SQL's temporal operators are applied to event time streams, watermarks play a critical role in determining when results are produced, and when the state is cleared.
When performing a temporal join:
- rows from the append-only table are buffered in Flink state until the current watermark of the join operator reaches their timestamps
- for the versioned table, for each key the latest version whose timestamp precedes the join operator's current watermark is kept in state, plus any versions from after the current watermark
- whenever the join operator's watermark advances, new results are produced, and state that's no longer relevant is cleared
The join operator tracks the watermarks it receives from its input channels, and its current watermark is always the minimum of these two watermarks. This is why your join stalls, and only makes progress when the flow_rate is updated.
One way to fix this would be to set the watermark for the TransportNetworkEdge_Kafka table like this:
"WATERMARK FOR `timestamp` AS " Watermark.MAX_WATERMARK
This will set the watermark for this table/stream to the largest possible value, which will have the effect of making the watermarks from this stream irrelevant -- this stream's watermarks will never be the smallest.
This will, however, have the drawback of making the join results non-deterministic.