We have a system that sell vouchers and this selling process must be integrated with another system. This integration happens through AWS SQS Queues.
System A process the order, then, at the end of the process, it publishes the message to the SQS Queue called
new-orders-queue
.System B reads data from the
new-orders-queue
, do some sort of processing and then publishes another event to another SQS Queue calledanother-sqs-queue
.System A read data from the
another-sqs-queue
and then updates the order created in the step 1
The ordering process (step 1 from above) is big, but nothing tremendously complex. It do some validations within it's database (MySQL) and then write some inserts to some tables.
All of this happen in a @Transactional
context from Spring.
The problem is that the step 3 sometimes is happening before the order from step 1 is finally commited to the database, which leds to an error (the order it have to update has not been found on the database, because it hasn't been commited yet). If we retry a second later, the process works normally. This is not happening all times, but we have to address this problem.
Have you seen this already?
Below is reduced (really) pseudo-code from the step 1:
@Transactional
public Result handleNewOrder(OrderData data) {
SqsClient sqsClient = new SqsClient();
validatePrices(data);
doSomeInserts(data);
Result result = createResult(data);
// the last line of the method, just before the return statement, is the line that post the event to the queue
sqsClient.sendEvent(Events.create(result));
return result;
}
At the end of this method annotated with @Transactional
, things should be commited, but somehow step 3 is being completed before the commit happens (atleast it seems like it).
Maybe moving the event publishing out of the transactional boundary is the solution (and actually, I'm in favor of it), because this way we can guarantee that the event will be processed only after the transaction has been commited to the database. But we will have to use some sort of retry mechanism in case our communication to SQS present a failure.
Is this the way to go or you have a better solution?
CodePudding user response:
This sounds like it might be an operation that requires multiple transactions.
For example, you might have two methods, each annotated with @Transactional:
@Transactional
public void startHandleNewOrder(OrderData data) {
// make changes to the database here and publish event to new-orders-queue
}
@Transactional
public Result finishHandleNewOrder(OrderData data) {
// await response from another-sqs-queue and compile result
}
This should work assuming that:
- A separate service NOT annotated with @Transactional (ie its outside of the transactional barrier) calls these methods in order
Alternately, you could implement this without annotations like so:
@Autowired
PlatformTransactionManager transactionManager;
@PersistenceContext
EntityManager entityManager;
public Result handleNewOrder(OrderData data) {
boolean rollback = true;
TransactionStatus status = getTransaction();
try {
// make changes to the database here and publish event to new-orders-queue
status.commit();
rollback = false;
} finally {
if (rollback)
status.rollback();
}
// this may or may not be necessary if you want to ensure you're reading
// fresh data from the database (otherwise cached from step #1 may be used)
entityManager.clear();
rollback = true;
status = getTransaction();
try {
// wait for a response and compile the result
status.commit();
rollback = false;
return result;
} finally {
if (rollback)
status.rollback();
}
}
private TransactionStatus getTransaction() {
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
return transactionManager.getTransaction(def);
}
CodePudding user response:
In the end, as suggested by M. Deinum, I've implemented @TransactionalEventListener
with the default phase (TransactionPhase.AFTER_COMMIT
).
Something like this:
@TransactionalEventListener(classes = {SellVoucherEvent.class})
public void dispatch(SellVoucherEvent event) {
sqsClient.sendMessage("queue", turnEventToString(event));
}
This method is implemented in a @Component
class and from my transactional context, I publish the event via a ApplicationEventPublisher
(which is injected by Spring).
Example:
private final ApplicationEventPublisher publisher; // injected by Spring
@Transactional
public Result handleNewOrder(OrderData data) {
validatePrices(data);
doSomeInserts(data);
Result result = createResult(data);
SellVoucherEvent event = createEvent();
publisher.publishEvent(event); // publish the application event
return result;
}
Then, after the commit, the dispatch
method annotated with @TransactionalEventListener
is invoked and then the event is sent to SQS. This way we can guarantee that the event will only be processed after the commit.