Home > Net >  Timeout acquiring a connection when streaming results using Express
Timeout acquiring a connection when streaming results using Express

Time:10-23

We use the following code to stream the results of a query back to the client:

app.get('/events', (req, res) => {
  try {
    const stream = db('events')
      .select('*')
      .where({ id_session: req.query.id_session })
      .stream()

    stream.pipe(JSONStream.stringify()).pipe(res)
  } catch (err) {
    next(err)
  }
})

While the code seems to have an excellent memory usage profile (stable/low memory usage) it creates random pool acquisition timeouts:

Knex: Timeout acquiring a connection. The pool is probably full. Are you missing a .transacting(trx) call?

This happens in production at seeming random intervals. Any idea why?

CodePudding user response:

This happens because aborted requests (i.e client closes the browser mid-request) don't release the connection back to the pool.

First, ensure you're on the latest knex; or at least v0.21.3 which has introduced fixes to stream/pool handling.

From the on you have a couple options:

Either use pipeline instead of pipe which handles aborted requests correctly like so:

const { pipeline } = require('stream')

app.get('/events', (req, res) => {
  try {
    const stream = db('events')
      .select('*')
      .where({ id_session: req.query.id_session })
      .stream()

    return pipeline(stream, JSONStream.stringify(), res, err => {
      if (err) {
        return console.log(`Pipeline failed with err:`, err)
      }

      console.log(`Pipeline ended succesfully`)
    })
  } catch (err) {
    next(err)
  }
})

or listen to the close event on req and destroy the DB stream yourself, like so:

app.get('/events', (req, res) => {
  try {
    const stream = db('events')
      .select('*')
      .where({ id_session: req.query.id_session })
      .stream()

    // Not listening to this event will crash the process if
    // stream.destroy(err) is called.
    stream.on('error', () => {
      console.log('Stream was destroyed')
    })

    req.on('close', () => {
      // stream.end() does not seem to work, only destroy()
      stream.destroy('Aborted request')
    })

    stream.pipe(JSONStream.stringify()).pipe(res)
  } catch (err) {
    next(err)
  }
})

Useful reading:

  • Related