I'm using two microservices in order to communicate with each other using Kafka. On one microservice there is the listener, and on the other microservice, the sender. The problem is that whenever producer sends a message, consumer will get 20 duplicate messages, and i want the consumer to receive only one.
Microservice A:
@Service
public class LocationProducer {
private static final String MAINLOCATION = "mainlocation";
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
public void sendMessage(String message){
this.kafkaTemplate.send(MAINLOCATION,message);
}
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Microservice B:
@Service
public class LocationConsumer {
@Autowired
LocationService locationService;
@KafkaListener(topics = "mainlocation", groupId = "group_id")
public void consume(String location){
locationService.saveLocation(new Location(1,location));
}
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Docker compose code:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPIC: "mainlocation:1:3"
Aditional information:
When doing a debug in intelij, if i put a breakpoint on:
this.kafkaTemplate.send(MAINLOCATION,message);
This method will be executed twice
On the consumer side, after it receives the message it tries to save into a mongoDB and it fails with an exception ( might be relevant for this i don't know )
I even tried setting autocommit to true, configured max.poll seconds and the problem is the same.
CodePudding user response:
A couple of pointers that might help.
You're setting the consumers to
earliest
. I'm not familiar with that particular docker image, but if you're not pruning the volumes between test attempts, it's possible that when you start your application it'll fetch the records from your previous attempt - that might be causing the listener logic to be executed twice. If you're triggering sending manually, you might set this tolatest
.You should check from the producer side if there are any errors that might be triggering retrials. You might want to check the topic using Kafka's console tools to see how many records are in fact in the topic after the producer is triggered.
You mention the execution fails. By default,
Spring Kafka
will retry 10 times if record consumption fails. You should either make your service idempotent or disable retries if that's a problem. You can also specify fatal exceptions that should not be retried.
CodePudding user response:
Problem Solved :
@Bean
DefaultErrorHandler eh() {
return new DefaultErrorHandler((rec, ex) -> {
System.out.println(ex.getMessage());
}, new FixedBackOff(0L, 0L));
}
By this, we are defining a Error Handler for Kafka that tells it to stop retrying after getting errors/exceptions.