I try to test the onFailure case when I send a kafka message with producer but the onFailure method is never fire.
Here is my code where I send a message :
@Component
public class MessageSending {
@Autowired
Map<String, KafkaTemplate<String, String>> producerByCountry;
String topicName = "countryTopic";
public void sendMessage(String data) {
producerByCountry.get("countryName").send(topicName, data).addCallback(
onSuccess -> {},
onFailure -> log.error("failed")
);
}
}
Here is the test class but it's still a success case and I have no idea how I can test the failure case (I want to add some processing inside the onFailure block but I would like to first know how I can trigger onFailure by testing).
@EmbeddedKafka
@SpringBootTest
public class MessageSendingTest {
@MockBean
Map<Country, KafkaTemplate<String, String>> producerByCountry;
@Autowired
EmbeddedKafkaBroker embeddedKafka;
@Autowired
MessageSending messageSending;
@Test
void failTest(CapturedOutput capturedOutput) {
var props = KafkaTestUtils.producerProps(embeddedKafka);
var producerTemplate = new DefaultKafkaProducerFactory<String, String>(props);
var template = new KafkaTemplate<>(producerTemplate);
given(producerByCountry.get("USA"))).willReturn(template);
messageSending.sendMessage("data");
assertThat(capturedOutput).contains("failed");
}
}
Can someone can help ?
Thanks.
CodePudding user response:
You can use a mock template; see this answer for an example:
How to mock result from KafkaTemplate
CodePudding user response:
I also tried the idea of this topic How to test Kafka OnFailure callback with Junit? by doing
doAnswer(invocationOnMock -> {
ListenableFutureCallback<SendResult<String, String>> listenableFutureCallback = invocationOnMock.getArgument(0);
KafkaProducerException value = new KafkaProducerException(new ProducerRecord<String, String>("myTopic", "myMessage"), "error", ex);
listenableFutureCallback.onFailure(value);
return null;
}).when(mock(ListenableFuture.class)).addCallback(any(ListenableFutureCallback.class));
But I got this mockito exception org.mockito.exceptions.misusing.UnnecessaryStubbingException
due by when().addCallback