Home > Software engineering >  How to use EntityManager/Hibernate Transaction with @KafkaHandler
How to use EntityManager/Hibernate Transaction with @KafkaHandler

Time:12-28

Let's say I have a @KafkaListener class with a @KafkaHandler method inside that processes any received messages and does some DB operations.

I want to have fine-grained control over how and when to commit (or rollback) the database changes (i.e. manually manage the DB transaction) in this class. The consumed message offset can be committed regardless of the DB transaction result.

Here is a simplified version of what I have:

@Service
@RequiredArgsConstructor
@KafkaListener(
    topics = "${kafka.topic.foo}",
    groupId = "${spring.kafka.consumer.group-id-foo}",
    containerFactory = "kafkaListenerContainerFactoryFoo")
public class FooMessageConsumer {

  // ...
  private final EntityManager entityManager;

  @KafkaHandler
  public void handleMessage(FooMessage msg) {
    // ...
    handleDBOperations(msg);
    // ...
  }

  void handleDBOperations(msg) {
    try {
      entityManager.getTransaction().begin();

      // ...

      entityManager.getTransaction().commit();
    } catch (Exception e) {
      log.error(e.getLocalizedMessage(), e);
      entityManager.getTransaction().rollback();
    }
  }

}

When a message is received and entityManager.getTransaction().begin(); is invoked, this results in an exception:

java.lang.IllegalStateException: Not allowed to create transaction on shared EntityManager - use Spring transactions or EJB CMT instead

Why am I not allowed to create a transaction here?

And if I remove EntityManager and add @Transactional annotations to methods with DB operations (though this is not exactly what I want), then it results in another exception:

TransactionRequiredException Executing an update/delete query

It seems like it completely ignores the annotation. Is this somehow related to Kafka consumer having its own transaction management?

In short, what am I doing wrong here and how can I manage DB transactions in a @KafkaHandler method?

Any help is appreciated. Thanks in advance.

CodePudding user response:

Try using Springs TransactionTemplate: https://docs.spring.io/spring-framework/docs/3.0.0.M4/reference/html/ch10s06.html

If your use case is (that) simple, Springs declarative transaction management should also let you achieve the behavior you ask for: https://docs.spring.io/spring-framework/docs/3.0.0.M3/reference/html/ch11s05.html

  • Related