I am trying to test a stream-stream join with TopologyTestDriver
. My goal is to confirm, without running external services, that my topology performs the following left join correctly.
bills
.leftJoin(payments)(
{
case (billValue, null) => billValue
case (billValue, paymentValue) => (billValue.toInt - paymentValue.toInt).toString
},
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100))
)
.to("debt")
In other words, if we see a bill
and a payment
within 100ms, the payment should be subtracted from the bill. If we do not see a payment, the debt is simply the bill.
Here is the test code.
val simpleLeftJoinTopology = new SimpleLeftJoinTopology
val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology)
val serde = Serdes.stringSerde
val bills = driver.createInputTopic("bills", serde.serializer, serde.serializer)
val payments = driver.createInputTopic("payments", serde.serializer, serde.serializer)
val debt = driver.createOutputTopic("debt", serde.deserializer, serde.deserializer)
bills.pipeInput("fred", "100")
bills.pipeInput("george", "20")
payments.pipeInput("fred", "95")
// When in doubt, sleep twice
driver.advanceWallClockTime(Duration.ofMillis(500))
Thread.sleep(500)
val keyValues = debt.readKeyValuesToList()
keyValues should contain theSameElementsAs Seq(
// This record is present
new KeyValue[String, String]("fred", "5"),
// This record is missing
new KeyValue[String, String]("george", "20")
)
Full code available at https://github.com/Oduig/kstreams-left-join-example
Is it possible to test a left join this way?
CodePudding user response:
I found out what the problem was. Kafka Streams works with "stream time", which is based on the event time of a record. The event time can come from multiple sources.
- Explicitly set in the Record itself by the producer.
- At runtime, if the event time is not set, Kafka uses the wall clock time of the broker.
- At test time, if the event time is not set, the
TestInputTopic
takes a fixed timestamp, which can be optionally incremented after every record by passingautoAdvance
.
My code was working at runtime, because Kafka used wall clock time by default but the test topics do not. This means that the so-called "stream time" was frozen in my test, and the window for "george"
was never closed.
We can set the time explicitly (or use autoAdvance
) to trigger the correct behavior.
val t0: Instant = Instant.now()
bills.pipeInput(new TestRecord("fred", "100", t0))
bills.pipeInput(new TestRecord("george", "20", t0))
payments.pipeInput(new TestRecord("fred", "95", t0))
// Sending an extra record with a later event time
payments.pipeInput(new TestRecord("percy", "0", t0.plusMillis(101)))
val keyValues = debt.readKeyValuesToList()
keyValues should contain theSameElementsAs Seq(
new KeyValue[String, String]("fred", "5"),
new KeyValue[String, String]("george", "20")
)
The test now passes. Note that there is no way to advance stream time, without sending a new record to the topic. This means if the data is very sparse, it can take a long time (on the wall clock) for records to be emitted.