Our infrastructure has to manage interchange of informations between 2 spring boot applications. Some informations are based on Oracle DB. We use Kafka to notify which information has to be managed by a receiver.
(NOTE: I had to generalize the code)
We have Kafka producer, configured like this:
@Configuration
@Lazy(false)
public class KafkaConfig {
//...
@Bean(name="trManagerJpaKafka")
public ChainedKafkaTransactionManager<Object, Object> chainedTm(KafkaTransactionManager<String, String> ktm,
JpaTransactionManager jpaTransactionManager) {
return new ChainedKafkaTransactionManager<>(jpaTransactionManager,ktm);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(KafkaOperations<?, ?> template,
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory, ChainedKafkaTransactionManager<Object, Object> chainedTm) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
//...
factory.setErrorHandler(new SeekToCurrentErrorHandler(recoverer,new FixedBackOff(0L, 2)));
factory.getContainerProperties().setTransactionManager(chainedTm);
return factory;
}
}
Then we have some services producing Kafka message, something like this
public class SomeService implements ISomeService {
@Override
@Transactional(transactionManager = "trManagerJpaKafka" , rollbackFor = { Exception.class })
public boolean createMessage(Long idIstituto, CreateRDM createRDM) {
...
Entity entity = new Entity();
entity.setSomething(something);
entity.setSomethingElse(somethingElse);
entity = entityRepository.save(entity);
JSONObject message = new JSONObject();
try {
message.put("id", entity.getId());
kafkaProducerService.sendMessage(message.toString());
...
}
...
}
public class SomeGenericKafkaService implements ISomeGenericKafkaService {
...
public void sendMessage(String data) {
Map<String, Object> headers = new HashMap<String, Object>();
headers.put(KafkaHeaders.TOPIC, someTopic);
headers.put(KafkaHeaders.MESSAGE_KEY, UUID.randomUUID().toString());
KafkaProducerCallback callback = new KafkaProducerCallback();
kafkaTemplate.send(new GenericMessage<String>(data, headers)).addCallback(callback);
if (callback.isError()) {
throw new KafkaException(callback.getThrowable());
}
}
...
}
In consumer spring boot application we sometimes face this issue: it consumes the message from Kafka, but sometimes the record on DB is not already present so we have exception when we try to access DB record... The transaction in producer is not atomic for both Jpa and Kafka? Kafka message is committed before Jpa insert?
In the meanwhile, just as emergency solution, to manage the exception in retrieving data from Oracle, we put some retry in the receiver, something like his:
...
Optional<Entity> optionalEntity = entityRepository.findById(id);
if (!optionalEntity.isPresent()) {
for (int i = 0; i < kafkaDbMaxRetry; i ) {
Thread.sleep(kafkaMessageDelay);
optionalEntity = entityRepository.findById(id);
if (optionalEntity.isPresent()) break;
}
}
entity = optionalEntity.get();
...
How to adjust the producer and avoid this bad code on consumer?
CodePudding user response:
Kafka only processes events from one partition sequentially, however events could be processed out of order if they end up in two different partitions. Therefore, you can implement your send message logic to include for example primary key of your db records as partition key.
Kafka uses the key to specify the target partition. The default strategy is to choose a partition based on a hash of the key or use round-robin algorithm if the key is null.
You could implement a custom org.apache.kafka.clients.producer.Partitioner
to map the messages to partitions you need. The name of you class must be set as a partitioner.class property of the producer.
CodePudding user response:
The chained transaction manager only provides "Best Effort 1 Phase Commit" - Kafka cannot participate in a JTA/XA transaction; it is impossible to provide atomic updates with these disparate technologies.
See https://www.infoworld.com/article/2077963/distributed-transactions-in-spring--with-and-without-xa.html for more information.
You have to make your listener idempotent - a common pattern is to store the topic/partition/offset along with the data so you can check if it has already been processed.