I am working on a node project where I have imported pg library for database operations. I have a Kafka queue from where I fetch events and store them in the database. I am fetching orders from kafka and every time an order is updated a new event is generated, I need to delete the old order details and replace them with new ones.
Below is the code
async function saveOrders(orders: Array<TransactionOnOrders>) {
const client = await pool.connect()
try {
await client.query('BEGIN')
if (orders.length) {
const deleted = await deleteOrders(client, orders[0].orderId)
logger.debug(`deleted rowCount ${deleted.rowCount} ${orders[0].orderId}`)
}
const queries = orders.map(ord => saveTransactionOnOrders(client, ord))
await Promise.all(queries)
await client.query('COMMIT')
} catch (e) {
await client.query('ROLLBACK')
throw e
} finally {
client.release()
}
}
The orders are getting updated very frequently and we are receiving lots of events creating a race condition leading to records not being deleted and insertion of extra records. For eg: let's say we received an event for order123 and the transaction is in process, till the time it completes another event for order123 is received so the deletion query returns 0 rows affected and the insertion query inserts another row leading to 2 rows while there should be only one record present.
I have tried to change the isolation level that didn't work well and resulted in an error
await client.query('BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ')
await client.query('BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE')
Is there any mistake I am doing here or is there a better to handle the above situation?
CodePudding user response:
This may be easier if you were updating rows rather than deleting them and recreating them. In that situation you can rely on row locks preventing concurrent updates.
CodePudding user response:
Use INSERT ... ON CONFLICT
to “upsert” the incoming rows. That is atomic and free from race conditions.