I have an Express based CRUD application which uses MongoDB as its DB. I have noticed that some concurrent writes fail if they use bulkWrite
and session
.
A simplified example looks like this:
import express from 'express';
import { v4 } from 'uuid';
import mongoose from 'mongoose';
const router = express.Router();
const mongoString = 'mongodb://127.0.0.1:27017/testMongo?directConnection=true';
const port = 3288;
const testId = mongoose.Types.ObjectId();
const Model = mongoose.model('Data', new mongoose.Schema({
whateverString: {
required: true,
type: String,
},
}));
mongoose.connect(mongoString);
const database = mongoose.connection;
database.on('error', (error) => {
console.log(error);
});
database.once('connected', async () => {
console.log('Database connected');
// Add test data if not exists
if (!await Model.exists({ _id: testId })) {
const data = new Model({
_id: testId,
whateverString: v4(),
});
await data.save();
}
});
const app = express();
app.use(express.json());
router.post('/', async (req, res) => {
const session = await mongoose.startSession();
session.startTransaction();
try {
await Model.bulkWrite([
{
updateOne: {
filter: {
_id: testId,
},
update: {
whateverString: v4(),
},
},
},
], { session });
await session.commitTransaction();
res.status(200).json({ allRight: true });
} catch (error) {
await session.abortTransaction();
console.log(error.message);
res.status(400).json({ message: error.message });
} finally {
session.endSession();
}
});
app.use('/', router);
app.listen(port, async () => {
console.log(`Server started at ${port}`);
});
What this does is:
- connecting to Mongo
- creating a test document
- creating a web server and one post route
- if the post route is called, the test document is updated with a random string in a
bulkWrite
and asession
Now take a simple client script which does three requests in parallel:
import fetch from 'node-fetch';
function doFetch() {
return fetch('http://localhost:3288', { method: 'post' });
}
async function myMain() {
try {
const promises = [doFetch(), doFetch(), doFetch()];
const response = await Promise.all(promises);
console.log('response', response.map(resp => ({ status: resp.status, text: resp.statusText })));
} catch (error) {
console.log(error);
}
}
myMain();
The result is: Only one DB query works, whereas the others fail with the error WriteConflict error: this operation conflicted with another operation. Please retry your operation or multi-document transaction.
I am rather new to MongoDB and Mongoose but in my understanding of databases, such a use-case should be fine. (In this example, the requests would overwrite each other and create chaos, but in the real-life use case that should not be a problem at all.)
Some observations I've made:
- Without passing
session
tobulkWrite
, everything works fine. - It is obviously some kind of race condition: sometimes two queries go through, sometimes only one.
- Setting
maxTransactionLockRequestTimeoutMillis
to 20000 did not help. - If I include the fetching in the server process itself (after
console.log('Server started ...
), then everything works fine. I cannot explain that to be honest.
What am I doing wrong? How can I solve that problem?
Appendix: The package.json
file of that example looks like this:
{
"name": "rest-api-express-mongo",
"dependencies": {
"express": "^4.17.3",
"mongoose": "^6.2.2",
"node-fetch": "^3.2.10",
"uuid": "^9.0.0"
},
"type": "module"
}
``
CodePudding user response:
Thanks to the comment provided by Marco Luzzara I was able to refactor and solve the issue via callbacks.
The code being now:
let retry = 0;
await database.getClient().withSession(async (session) => {
try {
await session.withTransaction(async () => {
await Model.bulkWrite([
{
updateOne: {
filter: {
_id: testId,
},
update: {
whateverString: v4(),
},
},
},
], { session });
await session.commitTransaction();
res.status(200).json({ allRight: true });
});
} catch (error) {
console.log(error.message);
res.status(400).json({ message: error.message });
retry = 1;
if (retry > 5) {
session.endSession();
}
}
});
Just for reference - the whole file looks now like:
import express from 'express';
import { v4 } from 'uuid';
import mongoose from 'mongoose';
const router = express.Router();
const mongoString = 'mongodb://127.0.0.1:27017/testMongo?directConnection=true';
const port = 3288;
const testId = mongoose.Types.ObjectId();
const Model = mongoose.model('Data', new mongoose.Schema({
whateverString: {
required: true,
type: String,
},
}));
mongoose.connect(mongoString);
const database = mongoose.connection;
database.on('error', (error) => {
console.log(error);
});
database.once('connected', async () => {
console.log('Database connected');
// Add test data if not exists
if (!await Model.exists({ _id: testId })) {
const data = new Model({
_id: testId,
whateverString: v4(),
});
await data.save();
}
});
const app = express();
app.use(express.json());
router.post('/', async (req, res) => {
let retry = 0;
await database.getClient().withSession(async (session) => {
try {
await session.withTransaction(async () => {
await Model.bulkWrite([
{
updateOne: {
filter: {
_id: testId,
},
update: {
whateverString: v4(),
},
},
},
], { session });
await session.commitTransaction();
res.status(200).json({ allRight: true });
});
} catch (error) {
console.log(error.message);
res.status(400).json({ message: error.message });
retry = 1;
if (retry > 5) {
session.endSession();
}
}
});
});
app.use('/', router);
app.listen(port, async () => {
console.log(`Server started at ${port}`);
});