Home > OS >  Using Kafka Streams TopologyTestDriver - how to test left join between two streams
Using Kafka Streams TopologyTestDriver - how to test left join between two streams

Time:08-31

I am trying to test a stream-stream join with TopologyTestDriver. My goal is to confirm, without running external services, that my topology performs the following left join correctly.

bills
  .leftJoin(payments)(
    {
      case (billValue, null) => billValue
      case (billValue, paymentValue) => (billValue.toInt - paymentValue.toInt).toString
    },
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100))
  )
  .to("debt")

In other words, if we see a bill and a payment within 100ms, the payment should be subtracted from the bill. If we do not see a payment, the debt is simply the bill.

Here is the test code.

val simpleLeftJoinTopology = new SimpleLeftJoinTopology
val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology)
val serde = Serdes.stringSerde

val bills = driver.createInputTopic("bills", serde.serializer, serde.serializer)
val payments = driver.createInputTopic("payments", serde.serializer, serde.serializer)
val debt = driver.createOutputTopic("debt", serde.deserializer, serde.deserializer)

bills.pipeInput("fred", "100")
bills.pipeInput("george", "20")
payments.pipeInput("fred", "95")

// When in doubt, sleep twice
driver.advanceWallClockTime(Duration.ofMillis(500))
Thread.sleep(500)

val keyValues = debt.readKeyValuesToList()
keyValues should contain theSameElementsAs Seq(
  // This record is present
  new KeyValue[String, String]("fred", "5"),
  // This record is missing
  new KeyValue[String, String]("george", "20")
)

Full code available at https://github.com/Oduig/kstreams-left-join-example

Is it possible to test a left join this way?

CodePudding user response:

I found out what the problem was. Kafka Streams works with "stream time", which is based on the event time of a record. The event time can come from multiple sources.

  1. Explicitly set in the Record itself by the producer.
  2. At runtime, if the event time is not set, Kafka uses the wall clock time of the broker.
  3. At test time, if the event time is not set, the TestInputTopic takes a fixed timestamp, which can be optionally incremented after every record by passing autoAdvance.

My code was working at runtime, because Kafka used wall clock time by default but the test topics do not. This means that the so-called "stream time" was frozen in my test, and the window for "george" was never closed.

We can set the time explicitly (or use autoAdvance) to trigger the correct behavior.

val t0: Instant = Instant.now()
bills.pipeInput(new TestRecord("fred", "100", t0))
bills.pipeInput(new TestRecord("george", "20", t0))
payments.pipeInput(new TestRecord("fred", "95", t0))

// Sending an extra record with a later event time
payments.pipeInput(new TestRecord("percy", "0", t0.plusMillis(101)))

val keyValues = debt.readKeyValuesToList()
keyValues should contain theSameElementsAs Seq(
  new KeyValue[String, String]("fred", "5"),
  new KeyValue[String, String]("george", "20")
)

The test now passes. Note that there is no way to advance stream time, without sending a new record to the topic. This means if the data is very sparse, it can take a long time (on the wall clock) for records to be emitted.

  • Related