Let's say I have a @KafkaListener
class with a @KafkaHandler
method inside that processes any received messages and does some DB operations.
I want to have fine-grained control over how and when to commit (or rollback) the database changes (i.e. manually manage the DB transaction) in this class. The consumed message offset can be committed regardless of the DB transaction result.
Here is a simplified version of what I have:
@Service
@RequiredArgsConstructor
@KafkaListener(
topics = "${kafka.topic.foo}",
groupId = "${spring.kafka.consumer.group-id-foo}",
containerFactory = "kafkaListenerContainerFactoryFoo")
public class FooMessageConsumer {
// ...
private final EntityManager entityManager;
@KafkaHandler
public void handleMessage(FooMessage msg) {
// ...
handleDBOperations(msg);
// ...
}
void handleDBOperations(msg) {
try {
entityManager.getTransaction().begin();
// ...
entityManager.getTransaction().commit();
} catch (Exception e) {
log.error(e.getLocalizedMessage(), e);
entityManager.getTransaction().rollback();
}
}
}
When a message is received and entityManager.getTransaction().begin();
is invoked, this results in an exception:
java.lang.IllegalStateException: Not allowed to create transaction on shared EntityManager - use Spring transactions or EJB CMT instead
Why am I not allowed to create a transaction here?
And if I remove EntityManager
and add @Transactional
annotations to methods with DB operations (though this is not exactly what I want), then it results in another exception:
TransactionRequiredException Executing an update/delete query
It seems like it completely ignores the annotation. Is this somehow related to Kafka consumer having its own transaction management?
In short, what am I doing wrong here and how can I manage DB transactions in a @KafkaHandler
method?
Any help is appreciated. Thanks in advance.
CodePudding user response:
Try using Springs TransactionTemplate: https://docs.spring.io/spring-framework/docs/3.0.0.M4/reference/html/ch10s06.html
If your use case is (that) simple, Springs declarative transaction management should also let you achieve the behavior you ask for: https://docs.spring.io/spring-framework/docs/3.0.0.M3/reference/html/ch11s05.html