I currently have a spring cloud stream application that has a listener function that mainly listens to a certain topic and executes the following in sequence:
- Consume messages from a topic
- Store consumed message in the DB
- Call an external service for some information
- Process the data
- Record the results in DB
- Send the message to another topic
- Acknowledge the message (I have the acknowledge mode set to manual)
We have decided to move to Spring cloud function, and I have been already able to already do almost all the steps above using the Function
interface, with the source topic as input and the sink topic as an output.
@Bean
public Function<Message<NotificationMessage>, Message<ValidatedEvent>> validatedProducts() {
return message -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
String status = restEndpoint.getStatusFor(message.getPayload());
ValidatedEvent event = getProcessingResult(message.getPayload(), status);
notificationMessageService.saveOrUpdate(notificationMessage, 1, true);
Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
return MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
.build();
}
}
My problem goes with exception handling in step 7 (Acknowledge the message). We only acknowledge the message if we are sure that it was sent successfully to the sink queue, otherwise we do no acknowledge the message.
My question is, how can such a thing be implemented within Spring cloud function, specially that the send method is fully dependant on the Spring Framework (as the result of the function interface implementation evaluation).
earlier, we could do this through try/catch
@StreamListener(value = NotificationMesage.INPUT)
public void onMessage(Message<NotificationMessage> message) {
try {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
String status = restEndpoint.getStatusFor(message.getPayload());
ValidatedEvent event = getProcessingResult(message.getPayload(), status);
Message message = MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
.build();
kafkaTemplate.send(message);
notificationMessageService.saveOrUpdate(notificationMessage, 1, true);
Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
}catch (Exception exception){
notificationMessageService.saveOrUpdate(notificationMessage, 1, false);
}
}
Is there a listener that triggers after the Function interface have returned successfully, something like KafkaSendCallback
but without specifying a template
CodePudding user response:
Building upon what Oleg mentioned above, if you want to strictly restore the behavior in your StreamListener
code, here is something you can try. Instead of using a function, you can switch to a consumer and then use KafkaTemplate
to send on the outbound as you had previously.
@Bean
public Consumer<Message<NotificationMessage>> validatedProducts() {
return message -> {
try{
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
String status = restEndpoint.getStatusFor(message.getPayload());
ValidatedEvent event = getProcessingResult(message.getPayload(), status);
Message message = MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
.build();
kafkaTemplate.send(message); //here, you make sure that the data was sent successfully by using some callback.
//only ack if the data was sent successfully.
Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
}
catch (Exception exception){
notificationMessageService.saveOrUpdate(notificationMessage, 1, false);
}
};
}
Another thing that is worth looking into is using Kafka transactions, in which case if it doesn't work end-to-end, no acknowledgment will happen. Spring Cloud Stream binder has support for this based on the foundations in Spring for Apache Kafka. More details here. Here is the Spring Cloud Stream doc on this.
CodePudding user response:
Spring cloud stream has no knowledge of function. It is just the same message handler as it was before, so the same approach with callback as you used before would work with functions. So perhaps you can share some code that could clarify what you mean? I also don't understand what do you mean by ..send method is fully dependant on the Spring Framework..