I need to create an application using stateful retry, that listen to a Kafka topic and make calls to some APIs and then commit the message. If in one of these calls an error occurs, for example a timeout, the application must retry 4 attempts with an interval of 4 seconds. At the end of these four attempts, if it still hasn't worked, the application should send it to a DLQ topic.
The part of sending to the DLQ topic that I'm not able to do. Because when I tried to configure the DLQ, the retry don't stop and not send to DLQ too.
@KafkaListener(topics = "${topic.name}", concurrency = "1")
public void listen(ConsumerRecord<String, AberturaContaLimiteCreditoCalculado> mensagem,
@Headers final MessageHeaders headers,
Acknowledgment ack) {
AberturaContaLimiteCreditoCalculadoData dados;
if (!validarMensagem(mensagem)) {
dados = mensagem.value().getData();
throw new RuntimeException();
//ack.acknowledge();
//This throw Runtime it's just to force it to retry.
}
}
private boolean validarMensagem(ConsumerRecord<String, AberturaContaLimiteCreditoCalculado> mensagem) {
return mensagem == null || mensagem.value() == null;
}
KafkaConfig:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(final ConsumerFactory<String, Object> consumerFactory) {
final ConcurrentKafkaListenerContainerFactory<String, Object> factory
= new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setCommonErrorHandler(new DefaultErrorHandler(
publisherRetryDLQ(),
new FixedBackOff(4000L, 4L)));
return factory;
}
public DeadLetterPublishingRecoverer publisherRetryDLQ() {
return new DeadLetterPublishingRecoverer(createKafkaTemplate(),
(record, ex) -> new TopicPartition(topicoDLQ, 0));
}
public ProducerFactory<String, String> producerFactory() {
final Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
return new DefaultKafkaProducerFactory<>(config);
}
public KafkaOperations<String, String> createKafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
Edit 2022-05-04:
We managed, from your tip with RetryListener and logging.level with Debug, to find the problem that was not building the Producer.
The problem now is that we receive a consumer with a different avro from the DLQ avro. The difference is that the DLQ has an extra field that must store the reason for the error.
2022/05/04 16:53:43.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [ERROR] o.s.k.l.DeadLetterPublishingRecoverer - Dead-letter publication to limites-abertura-conta-limite-credito-calculado-convivenciaaberturaconta-dlq failed for: limites-abertura-conta-limite-credito-calculado-0@6
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema{...}]}}]}
Is there a way to do this conversion?
CodePudding user response:
If I understand the question properly, you want to create a ProducerRecord
with a different value type.
Simply subclass the DLPR and override createProducerRecord()
.
/**
* Subclasses can override this method to customize the producer record to send to the
* DLQ. The default implementation simply copies the key and value from the consumer
* record and adds the headers. The timestamp is not set (the original timestamp is in
* one of the headers). IMPORTANT: if the partition in the {@link TopicPartition} is
* less than 0, it must be set to null in the {@link ProducerRecord}.
* @param record the failed record
* @param topicPartition the {@link TopicPartition} returned by the destination
* resolver.
* @param headers the headers - original record headers plus DLT headers.
* @param key the key to use instead of the consumer record key.
* @param value the value to use instead of the consumer record value.
* @return the producer record to send.
* @see KafkaHeaders
*/
protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
TopicPartition topicPartition, Headers headers, @Nullable byte[] key, @Nullable byte[] value) {
You can examine the headers to determine the exception that caused the failure. If you need the actual exception, override accept()
to capture it in a ThreadLocal
, then call super.accept()
; you can then use the thread local in createProducerRecord()
.
There are several solutions to publish the different type with the same producer factory.
- you can us a
DelegatingByTypeSerializer
see KafkaConsumer With Multiple Different Avro Producers And Transactions for an example - you can configure the publisher with a
KafkaTemplate
that uses a different serializer (it has a constructor where you can override the producer factory configs).