Home > Software design >  Test onFailure of spring-kafka sending message
Test onFailure of spring-kafka sending message

Time:01-11

I try to test the onFailure case when I send a kafka message with producer but the onFailure method is never fire.

Here is my code where I send a message :

@Component
public class MessageSending {

    @Autowired
    Map<String, KafkaTemplate<String, String>> producerByCountry;

    String topicName = "countryTopic";

    public void sendMessage(String data) {
        producerByCountry.get("countryName").send(topicName, data).addCallback(
                onSuccess -> {},
                onFailure -> log.error("failed")
        );
    }
}

Here is the test class but it's still a success case and I have no idea how I can test the failure case (I want to add some processing inside the onFailure block but I would like to first know how I can trigger onFailure by testing).

@EmbeddedKafka
@SpringBootTest
public class MessageSendingTest {

    @MockBean
    Map<Country, KafkaTemplate<String, String>> producerByCountry;

    @Autowired
    EmbeddedKafkaBroker embeddedKafka;

    @Autowired
    MessageSending messageSending;

    @Test
    void failTest(CapturedOutput capturedOutput) {
        var props = KafkaTestUtils.producerProps(embeddedKafka);
        var producerTemplate = new DefaultKafkaProducerFactory<String, String>(props);
        var template = new KafkaTemplate<>(producerTemplate);

        given(producerByCountry.get("USA"))).willReturn(template);

        messageSending.sendMessage("data");

        assertThat(capturedOutput).contains("failed");
        
    }
}

Can someone can help ?

Thanks.

CodePudding user response:

You can use a mock template; see this answer for an example:

How to mock result from KafkaTemplate

CodePudding user response:

I also tried the idea of this topic How to test Kafka OnFailure callback with Junit? by doing

doAnswer(invocationOnMock -> {
    ListenableFutureCallback<SendResult<String, String>> listenableFutureCallback = invocationOnMock.getArgument(0);
    KafkaProducerException value = new KafkaProducerException(new ProducerRecord<String, String>("myTopic", "myMessage"), "error", ex);
    listenableFutureCallback.onFailure(value);
    return null;
}).when(mock(ListenableFuture.class)).addCallback(any(ListenableFutureCallback.class));

But I got this mockito exception org.mockito.exceptions.misusing.UnnecessaryStubbingException due by when().addCallback

  • Related