Home > Software engineering >  MongoDB writing conflict for simple session-based parallel request
MongoDB writing conflict for simple session-based parallel request

Time:09-16

I have an Express based CRUD application which uses MongoDB as its DB. I have noticed that some concurrent writes fail if they use bulkWrite and session.

A simplified example looks like this:

import express from 'express';
import { v4 } from 'uuid';
import mongoose from 'mongoose';

const router = express.Router();

const mongoString = 'mongodb://127.0.0.1:27017/testMongo?directConnection=true';
const port = 3288;
const testId = mongoose.Types.ObjectId();

const Model = mongoose.model('Data', new mongoose.Schema({
  whateverString: {
    required: true,
    type: String,
  },
}));

mongoose.connect(mongoString);
const database = mongoose.connection;

database.on('error', (error) => {
  console.log(error);
});

database.once('connected', async () => {
  console.log('Database connected');

  // Add test data if not exists
  if (!await Model.exists({ _id: testId })) {
    const data = new Model({
      _id: testId,
      whateverString: v4(),
    });
    await data.save();
  }
});

const app = express();
app.use(express.json());

router.post('/', async (req, res) => {
  const session = await mongoose.startSession();
  session.startTransaction();
  try {
    await Model.bulkWrite([
      {
        updateOne: {
          filter: {
            _id: testId,
          },
          update: {
            whateverString: v4(),
          },
        },
      },
    ], { session });
    await session.commitTransaction();
    res.status(200).json({ allRight: true });
  } catch (error) {
    await session.abortTransaction();
    console.log(error.message);
    res.status(400).json({ message: error.message });
  } finally {
    session.endSession();
  }
});

app.use('/', router);

app.listen(port, async () => {
  console.log(`Server started at ${port}`);
});

What this does is:

  • connecting to Mongo
  • creating a test document
  • creating a web server and one post route
  • if the post route is called, the test document is updated with a random string in a bulkWrite and a session

Now take a simple client script which does three requests in parallel:

import fetch from 'node-fetch';

function doFetch() {
  return fetch('http://localhost:3288', { method: 'post' });
}

async function myMain() {
  try {
    const promises = [doFetch(), doFetch(), doFetch()];
    const response = await Promise.all(promises);
    console.log('response', response.map(resp => ({ status: resp.status, text: resp.statusText })));
  } catch (error) {
    console.log(error);
  }
}

myMain();

The result is: Only one DB query works, whereas the others fail with the error WriteConflict error: this operation conflicted with another operation. Please retry your operation or multi-document transaction.

I am rather new to MongoDB and Mongoose but in my understanding of databases, such a use-case should be fine. (In this example, the requests would overwrite each other and create chaos, but in the real-life use case that should not be a problem at all.)

Some observations I've made:

  • Without passing session to bulkWrite, everything works fine.
  • It is obviously some kind of race condition: sometimes two queries go through, sometimes only one.
  • Setting maxTransactionLockRequestTimeoutMillis to 20000 did not help.
  • If I include the fetching in the server process itself (after console.log('Server started ...), then everything works fine. I cannot explain that to be honest.

What am I doing wrong? How can I solve that problem?

Appendix: The package.json file of that example looks like this:

{
  "name": "rest-api-express-mongo",
  "dependencies": {
    "express": "^4.17.3",
    "mongoose": "^6.2.2",
    "node-fetch": "^3.2.10",
    "uuid": "^9.0.0"
  },
  "type": "module"
}
``

CodePudding user response:

Thanks to the comment provided by Marco Luzzara I was able to refactor and solve the issue via callbacks.

The code being now:

let retry = 0;

  await database.getClient().withSession(async (session) => {
    try {
      await session.withTransaction(async () => {
        await Model.bulkWrite([
          {
            updateOne: {
              filter: {
                _id: testId,
              },
              update: {
                whateverString: v4(),
              },
            },
          },
        ], { session });
        await session.commitTransaction();
        res.status(200).json({ allRight: true });
      });
    } catch (error) {
      console.log(error.message);
      res.status(400).json({ message: error.message });
      retry  = 1;
      if (retry > 5) {
        session.endSession();
      }
    }
  });

Just for reference - the whole file looks now like:

import express from 'express';
import { v4 } from 'uuid';
import mongoose from 'mongoose';

const router = express.Router();

const mongoString = 'mongodb://127.0.0.1:27017/testMongo?directConnection=true';
const port = 3288;
const testId = mongoose.Types.ObjectId();

const Model = mongoose.model('Data', new mongoose.Schema({
  whateverString: {
    required: true,
    type: String,
  },
}));

mongoose.connect(mongoString);
const database = mongoose.connection;

database.on('error', (error) => {
  console.log(error);
});

database.once('connected', async () => {
  console.log('Database connected');

  // Add test data if not exists
  if (!await Model.exists({ _id: testId })) {
    const data = new Model({
      _id: testId,
      whateverString: v4(),
    });
    await data.save();
  }
});

const app = express();
app.use(express.json());

router.post('/', async (req, res) => {
  let retry = 0;

  await database.getClient().withSession(async (session) => {
    try {
      await session.withTransaction(async () => {
        await Model.bulkWrite([
          {
            updateOne: {
              filter: {
                _id: testId,
              },
              update: {
                whateverString: v4(),
              },
            },
          },
        ], { session });
        await session.commitTransaction();
        res.status(200).json({ allRight: true });
      });
    } catch (error) {
      console.log(error.message);
      res.status(400).json({ message: error.message });
      retry  = 1;
      if (retry > 5) {
        session.endSession();
      }
    }
  });
});

app.use('/', router);

app.listen(port, async () => {
  console.log(`Server started at ${port}`);
});

  • Related