Home > Back-end >  Unit Testing a Kafka SpringBoot producer
Unit Testing a Kafka SpringBoot producer

Time:06-17

I am trying to create a unit test for my Kafka Producer which is integrated into a file. Here's my Kafka Producer:

FileName: MessageProducer.java

public boolean sendMessage(ReceivedMessage message) {
    private String topicName = "output-flow";
    try{
        logger.info("Sending message: {} to topic: {}", message, topicName);
        kafkaProducer.send(topicName, message).get();
        return true;
    } catch (Exception e){
        logger.error("Error sending message: {} to topic: {}", message, topicName, e);
        return false;
    }
}

And here is what I have done so far for my unit test, obviously, with not success at all:

@Mock
private KafkaTemplate<String, ReceivedMessage > kafkaProducer;

private static final String TRANSACTION_TOPIC = "test";

// Function for parameterized values

@ParameterizedTest
@MethodSource("getTransactionProvider")
public void sendMessageTest(ReceivedMessage message) {
    MessageProducer mockProducer = new MessageProducer(kafkaProducer);
    when(kafkaProducer.send(TRANSACTION_TOPIC, message)).thenReturn({no idea what to put here});
    when(mockProducer.sendMessage(message)).thenReturn(true);
    assertTrue(mockProducer.sendMessage(message));
}

// Test for exception
// Fails too
@ParameterizedTest
@MethodSource("getTransactionProvider")
public void sendMessageTest_ThrowsException(ReceivedMessage message) {
    MessageProducer mockProducer = new MessageProducer(kafkaProducer);
    when(kafkaProducer.send(TRANSACTION_TOPIC, message)).thenThrow(new RuntimeException());
    assertThrows(RuntimeException.class, () -> mockProducer.sendMessage(null));
}

I get Exception: org.opentest4j.AssertionFailedError: Expected java.lang.RuntimeException to be thrown, but nothing was thrown.

CodePudding user response:

If I understood your question, you should return a new SendResult that would have the methods implemented with the data you expect

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/support/SendResult.html

And wrap it in a Future

ListenableFuture<SendResult<K,​V>>

Alternatively, make sendMessage void method (or return a Future itself), and pass in a producer callback parameter that's carried through to send, rather than making it block. Then you can assert the response of the callback

CodePudding user response:

Welcome to SO...

Why does your test case fail? Because your logic will not throw an error.

Your function will not throw an exception since you catch the exception inside the function as follow and you return boolean value.

catch (Exception e){
        logger.error("Error sending message: {} to topic: {}", message, topicName, e);
        return false;
    }

In that case, you need to test whether the function returns false or not.

As I commented earlier, don't block the main thread by calling the get method in the future object. You can simply implement the future callbacks which can be invoked once you get the result as following

public void sendMessage(ReceivedMessage message) {
    private String topicName = "output-flow";
    try{
        logger.info("Sending message: {} to topic: {}", message, topicName);
          ListenableFuture<SendResult<String, String>> future  = kafkaProducer.send(topicName, message);


                future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
    
                    @Override
                    public void onSuccess(SendResult<String, String> result) {
    
                        System.out.println("Message Sent  "   result.getRecordMetadata().timestamp());
                        //your logic for the success scenario    
                    }
    
                    @Override
                    public void onFailure(Throwable ex) {
    
                        System.out.println(" sending failed  ");
// your logic if failed
throw new RuntimeException("Kafka Failed");
   
    
                    }
                });
    
    } catch (Exception e){
        logger.error("Error sending message: {} to topic: {}", message, topicName, e);
        throw new RuntimeException("Exception occurred");
    }
}
  • Related