Home > OS >  How to do deferred jobs in Node API (Express)
How to do deferred jobs in Node API (Express)

Time:02-11

I have an API in Node with many endpoints. It works well but there is one that can have large requests run up to 1 hour of processing which can often break. I'm thinking of only sending back a url on a request where you can check the status of the request and then download it once it's ready. What would be the best way to handle a queue of jobs in Node for this?

Sample code below for the current endpoint

const router = express.Router();
const schema = joi.object().keys // This is a schema to validate the json input

router.post('/', async (req, res) => {
  let conn = await connect(); // Util method that connects to a Q/KDB server
  let request = req.body;

  joi.validate(request, schema, (err, _result) => {
    if (err) {
      res.status(400).send({ error: err['details'][0]['message'] });
    }
    else {
      let qRequest = buildRequest(request); // Util function to build request
      // Connect to Q/KDB server with node-q package and process request
      conn.k('api.process', qRequest, function(err, resp) {
        if (err) {
          log // Write to log
          res.status(400).send({ error: err['details'][0]['message']
        }
        else {
          res.status(200).send(resp);
        }
      });
    }
  });

});

EDIT: I have found that I basically just have to build a job queue with job ids corresponding to them. The package Bull seems to be good but I don't want to add another dependency such as Redis.

CodePudding user response:

Conceptually there are a couple ways to approach a problem like this:

  1. You can return a jobID and let the client query that jobID on some recurring basis using a URL that contains the jobID until they get a result (this sounds like what you envisioned)
  2. You can have the client connect a webSocket or socket.io connection to the server and when the result is done, the server can directly send the result over the websocket/socket.io connection.
  3. You can use Server Sent Events (SSE) to "push" the result to the client when it's done.

Here's the outline of a scheme for the first option above:

  1. Coins a unique jobID for each incoming job to process
  2. Creates a route for querying the status of a jobID
  3. Has a Map object that contains a list of jobs in process that is indexed by jobID
  4. Has a setInterval() that sweeps through the jobs in the job Map to remove any expired jobs (jobs where the client never came back to get them). You can set the frequency of that sweep and the amount of time that you keep the job.
  5. When a request comes in, it coins a new jobID, adds a "pending" job to the Map and returns back to the client a URL which they can query the job status on.
  6. When you eventually finish processing the job, the result is added to the job object and its status is changed to "complete".
  7. A route is added to query job status that includes the jobID.
  8. If, when queried, the job status is "complete", then the result is returned and the job is removed from the Map.
  9. If, when queried, the job status is "error", then the error is returned and the job is removed from the Map.
  10. If, when queried, the jobID is not present, 404 status is returned
  11. If, when queried, the job status is anything other than "complete" or "error", then the job.status and optional job.progress is returned. This allows your long running process to communicate back any progress if you want and you can use multiple status values if you want.

Here's code to illustrate the concept:

// A map of objects,
//   the key is the jobID
//   data is an object {status: "complete", result: someResult, timeStarted: someTime}
// If the job is not yet complete, status will be something other than "complete"
// and result will not yet exist
const jobs = new Map();

// check for expired jobs
const expirationInterval = 60 * 60 * 1000; // run expiration check once an hour
const expirationTime = 12 * 60 * 60 * 1000; // let jobs stay here for 12 hours
setInterval(() => {
    // accumulate an array of items to remove so we aren't modifying while iterating
    const expired = [];
    const now = Date.now();
    for (let [key, job] of jobs) {
        if (now - job.timeStarted > expirationTime) {
            expired.push(key);
        }
    }
    // now remove all expired jobs
    for (let key of expired) {
        jobs.delete(key);
    }
}, expirationInterval);

// make a job id that consists of current time (in ms) plus random number
// jobs can then be sorted or aged by time also
function makeJobID() {
    const base = Date.now().toString();
    const random = Math.random().toFixed(6).toString().slice(2);
    return base   "_"   random;
}

// fetch data for a jobID
// The job may either not exist any more,
//   may still be "pending" (or have some other status)
//   or may be "complete"
// Note: if this router is not at the top level, you will have to make
// this path line up with the URL you sent back to the client
router.get("/jobstatus/:jobID", (req, res) => {
    let job = jobs.get(req.params.jobID);
    if (!job) {
        res.sendStatus(404);
        return;
    }
    if (job.status === "complete") {
        // remove it from the jobs Map and send the data
        jobs.delete(req.params.jobID);
        res.send({
            status: "complete",
            result: job.result
        });
    } else if (job.status === "error") {
        // remove it from the jobs Map and send the data
        jobs.delete(req.params.jobID);
        res.send({
            status: "error",
            error: job.error
        });
    } else {
        // optional job.progress can also be communicated back.  This can be
        // a number, a string or an object of other data
        if (job.progress) {
            res.send({ status: job.status, progress: job.progress });
        } else {
            res.send({ status: job.status });
        }
    }
});


router.post('/', async (req, res) => {
    let conn;
    try {
        conn = await connect(); // Util method that connects to a Q/KDB server
    } catch (e) {
        console.log(e);
        res.sendStatus(500);
    }
    let request = req.body;

    joi.validate(request, schema, (err, _result) => {
        if (err) {
            res.status(400).send({ error: err['details'][0]['message'] });
        } else {
            // coin job id and add a job object to the jobs map
            const jobID = makeJobID();
            const job = {
                timeStarted: Date.now(),
                status: "pending"
            };
            jobs.set(jobID, job);

            // send response now that gives them a URL to query
            res.status(202).send({
                status: "Job submitted",
                url: `https://yourdomain.com/jobstatus/${jobID}` // pick whatever URL you want here
            });

            let qRequest = buildRequest(request); // Util function to build request
            // Connect to Q/KDB server with node-q package and process request
            conn.k('api.process', qRequest, function(err, resp) {
                if (err) {
                    // set job status to "error"
                    job.status = "error";
                    job.timeCompleted = Date.now();
                    try {
                        job.error = err['details'][0]['message'];
                    } catch (e) {
                        console.log(e);
                        job.error = "known";
                    }
                } else {
                    // job has finished, update the job
                    // we can update the job object directly because the job Map
                    // points at this same object
                    job.status = "complete";
                    job.timeCompleted = Date.now();
                    job.result = resp;
                }
            });
        }
    });
});
  • Related