Home > Blockchain >  Spring Boot JPA interaction from reactive AMQP background thread
Spring Boot JPA interaction from reactive AMQP background thread

Time:10-27

We have a regular, blocking Spring Boot (2.5.4) application serving a REST interface. We use Hibernate envers for audit logging. This service sends messages on our Service Bus in an asynchronous fashion via the ServiceBusSenderAsyncClient from the Azure SDK. Now this service needs to receive messages from said Service Bus and write updates to the Database - also async. For this we use ServiceBusReceiverAsyncClient with a reactive stream. All of this generally works (I wrapped the whole database interaction in one Transactional function) but I'm encountering issues with envers via the beforeTransactionCompletion callback being null (when I deactivate envers, it works). This issues is Service Bus independent from my understanding and really rotates around accessing the JPA repo from a background thread. Following the schematic implementation:

ServiceBusReceiveController

@Component
public class SubscriptionReceiveController {
  @Autowired
  private MyService myService;

  @Autowired
  private ServiceBusReceiverAsyncClient receiver;

  @Autowired
  private ObjectMapper mapper;

  @PostConstruct
  public void run() {
    // continuously receive messages in the background
    receiver.receiveMessages().flatMap(message -> {
      MyObject object;
      try {
        object = mapper.readValue(message.getBody().toString(), MyClass.class);
      } catch (JsonProcessingException e) {
        throw new JsonProcessingRuntimeException("Failed to convert message to object", e);
      }

      // wrap the blocking JPA interaction
      Mono<MyObject> blockingWrapper = Mono.fromCallable(() -> {
        return myService.updateMyObject(object.getId());
      });

      // When the service returns, this is where it crashes
      return blockingWrapper.subscribeOn(Schedulers.boundedElastic());
    }).flatMap(response -> {
      // More processing steps
    }).subscribe();
  }
}

MyService

@Service
public class MyService {

  @Autowired
  private MyRepository myRepository;

  @Transactional
  public MyObject updateMyObject(UUID objectId) {
    var object = myRepository.getById(objectId);
    object.setSomeProperty("myPropertyValue");

    return myRepository.save(object);
  }
}

Causing following error message:

org.springframework.orm.jpa.JpaSystemException: Unable to perform beforeTransactionCompletion callback: null; nested exception is org.hibernate.HibernateException: Unable to perform beforeTransactionCompletion callback: null
Caused by: org.hibernate.HibernateException: Unable to perform beforeTransactionCompletion callback: null
        at org.hibernate.engine.spi.ActionQueue$BeforeTransactionCompletionProcessQueue.beforeTransactionCompletion(ActionQueue.java:960) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
        at org.hibernate.engine.spi.ActionQueue.beforeTransactionCompletion(ActionQueue.java:525) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
        at org.hibernate.internal.SessionImpl.beforeTransactionCompletion(SessionImpl.java:2381) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
        at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.beforeTransactionCompletion(JdbcCoordinatorImpl.java:448) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
        at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.beforeCompletionCallback(JdbcResourceLocalTransactionCoordinatorImpl.java:183) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
        at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.access$300(JdbcResourceLocalTransactionCoordinatorImpl.java:40) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
        at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.commit(JdbcResourceLocalTransactionCoordinatorImpl.java:281) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
        at org.hibernate.engine.transaction.internal.TransactionImpl.commit(TransactionImpl.java:101) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
        at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:562) ~[spring-orm-5.3.9.jar:5.3.9]
        ... 20 common frames omitted

...

Caused by: java.lang.NullPointerException: null
        at com.my.package.revision.AuditRevisionListener.newRevision(AuditRevisionListener.java:10) ~[classes/:na]
        at org.hibernate.envers.internal.revisioninfo.DefaultRevisionInfoGenerator.generate(DefaultRevisionInfoGenerator.java:88) ~[hibernate-envers-5.4.28.Final.jar:5.4.28.Final]
        at org.hibernate.envers.internal.synchronization.AuditProcess.getCurrentRevisionData(AuditProcess.java:133) ~[hibernate-envers-5.4.28.Final.jar:5.4.28.Final]
        at org.hibernate.envers.internal.synchronization.AuditProcess.executeInSession(AuditProcess.java:115) ~[hibernate-envers-5.4.28.Final.jar:5.4.28.Final]
        at org.hibernate.envers.internal.synchronization.AuditProcess.doBeforeTransactionCompletion(AuditProcess.java:174) ~[hibernate-envers-5.4.28.Final.jar:5.4.28.Final]
        at org.hibernate.envers.internal.synchronization.AuditProcessManager$1.doBeforeTransactionCompletion(AuditProcessManager.java:47) ~[hibernate-envers-5.4.28.Final.jar:5.4.28.Final]
        at org.hibernate.engine.spi.ActionQueue$BeforeTransactionCompletionProcessQueue.beforeTransactionCompletion(ActionQueue.java:954) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
        ... 28 common frames omitted

What am I missing here, why is the transaction not fully finished within the Service implementation? Is there a better way/pattern for a background thread to write to a database via JPA?

Thanks

CodePudding user response:

null pointer exception is here: com.my.package.revision.AuditRevisionListener.newRevision(AuditRevisionListener.java:10) nothing to do with transactions?

CodePudding user response:

Gosh, yes thank you that was it, totally overlooked it. Was caught up in the transaction... Just for clarification, the above posted code works.

  • Related