I have @KafkaListener:
@KafkaListener(topicPattern = "SameTopic")
public void onMessage(Message<String> message, Acknowledgment acknowledgment) {
String eventType = new String((byte[]) message.getHeaders().get("Event-Type"), StandardCharsets.UTF_8);
switch (eventType) {
case "create" -> doCreate(message);
case "update" -> doUpdate(message);
case "delete" -> doDelete(message);
}
}
Producer sets custom header Event-Type
with three possible values: create
, update
, delete
. Currently I'm reading this header value from Message
and then invoke rest of the logic according to the header value.
Is there any way to create three @KafkaListeners where each of them will consume message filtered by some criteria - for my case filtered by header Event-Type
value?
@KafkaListener(topicPattern = "SameTopic", ...)
public void onCreate(Message<String> message, Acknowledgment acknowledgment) {
doCreate(message);
}
@KafkaListener(topicPattern = "SameTopic", ...)
public void onUpdate(Message<String> message, Acknowledgment acknowledgment) {
doUpdate(message);
}
@KafkaListener(topicPattern = "SameTopic", ...)
public void onDelete(Message<String> message, Acknowledgment acknowledgment) {
doDelete(message);
}
I'm aware of RecordFilterStrategy
, but couldn't get any help of it.
CodePudding user response:
Consider to have those types mapped to the partition on the topic.
This way you definitely can have different @KafkaListener
with the specific partition assigned:
/**
* The topicPartitions for this listener when using manual topic/partition
* assignment.
* <p>
* Mutually exclusive with {@link #topicPattern()} and {@link #topics()}.
* @return the topic names or expressions (SpEL) to listen to.
*/
TopicPartition[] topicPartitions() default {};
The doc is here: https://docs.spring.io/spring-kafka/docs/current/reference/html/#manual-assignment
It's probably not going to work well with several instances of your app, since with manual assignment there is no consumer group involved. You may consider to refine the logic to 3 different topics. Or if that is not possible from produce side, use Kafka Streams to split()
the original topic to other topics according the record key.