Home > Software engineering >  Embedded Kafka Spring test executes before embedded Kafka is ready
Embedded Kafka Spring test executes before embedded Kafka is ready

Time:11-11

I have a Spring Boot project that has a Kafka listener that I want to test using Embedded Kafka. I have the Kafka Listener log out the message "record received". Which will only be be logged out if I add a Thread.sleep(1000) to the start of the method.

Test class:

@SpringBootTest
@DirtiesContext
@EnableKafka
@EmbeddedKafka(partitions = 1, topics = { "my-topic" }, ports = 7654)
class KafkaTest {

    private static final String TOPIC = "my-topic";

    @Autowired
    EmbeddedKafkaBroker kafkaBroker;

    @Test
    void testSendEvent() throws ExecutionException, InterruptedException {
        // Thread.sleep(1000); // I wont see the Listener log message unless I add this sleep
        Producer<Integer, String> producer = configureProducer();
        ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(TOPIC, "myMessage");
        producer.send(producerRecord).get();
        producer.close();
    }

    private Producer<Integer, String> configureProducer() {
        Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(kafkaBroker));
        return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
    }
}

I don't want to use the fickle Thread.sleep() The test is obviously executing before some setup processes have completed. I clearly need to wait on something, but I am not sure what nor how to do it.

Using:

  • Java 11
  • Spring Boot 2.5.6
  • JUnit 5
  • spring-kafka-test 2.7.8

CodePudding user response:

I clearly need to wait on something, but I am not sure what nor how to do it.

You need to use a different method to give Kafka time to process and route the message ...

Look at this line ...

ConsumerRecord<String, String> message = records.poll(500, TimeUnit.MILLISECONDS);

When testing Kafka listeners we always specify a poll delay. This is because your message is given to kafka, which will then process it in another thread. And you need to wait for it.

Here's how it looks in context of the code its used in.

class UserKafkaProducerTest {
  @Test
  void testWriteToKafka() throws InterruptedException, JsonProcessingException {
      // Create a user and write to Kafka
      User user = new User("11111", "John", "Wick");
      producer.writeToKafka(user);

      // Read the message (John Wick user) with a test consumer from Kafka and assert its properties
      ConsumerRecord<String, String> message = records.poll(500, TimeUnit.MILLISECONDS);
      assertNotNull(message);
      assertEquals("11111", message.key());
      User result = objectMapper.readValue(message.value(), User.class);
      assertNotNull(result);
      assertEquals("John", result.getFirstName());
      assertEquals("Wick", result.getLastName());
  }
}

This is a code piece from this article, which makes stuff clear.

CodePudding user response:

Add an @EventListener bean to the test context and (for example) count down a CountDownLatch when a ConsumerStartedEvent is received; then in the test

assertThat(eventListner.getLatch().await(10, TimeUnit.SECONDS)).isTrue();

See https://docs.spring.io/spring-kafka/docs/current/reference/html/#events

and

https://docs.spring.io/spring-kafka/docs/current/reference/html/#event-consumption

Or add a ConsumerRebalanceListener and wait for partition assignment.

  • Related