Home > front end >  Javascript serial queue using async?
Javascript serial queue using async?

Time:10-12

I have the following code:

var async  = require('async');

const timeout = ms => new Promise(resolve => setTimeout(resolve, ms));

var serialQueue = async.queue(function(task, callback) { 
    console.log(`done with tasks...`);
    callback(); 
}, 1);

async function fx(msg, waitTime) {
    console.log(`START FX: ${msg}. waitTime: ${waitTime}`);
    await timeout(waitTime);
    console.log(`DONE FX: ${msg}`);
}

async function test() {
    serialQueue.push(fx("1", 10000));
    serialQueue.push(fx("2", 2000));
    serialQueue.push(fx("3", 1000));
    await serialQueue.drain();
    console.log(`DRAINED!!!`);
}

test();

When I run the code above I get this:

START FX: 1. waitTime: 10000
START FX: 2. waitTime: 2000
START FX: 3. waitTime: 1000
DRAINED!!!
done with tasks...
done with tasks...
done with tasks...
all items have been processed
DONE FX: 3
DONE FX: 2
DONE FX: 1

To do that I referenced this link:

https://caolan.github.io/async/v3/docs.html#queue

Could someone help me understand why the "serial" nature of the queue is not respected here? I set concurrency to 1 when I created serialQueue but, it looks like the queue isn't really serializing the execution of the three functions passed in.

How can I get this queue to execute the three functions in order, so have 2 wait for 1 and 3 wait for 2...? Also, why is drained called before all functions are actually done executing?

CodePudding user response:

There are a few things wrong here.

First: serialQueue.push(fx(...)) invokes fk immediately, does not wait for it to complete (you have no await) and places the return value (a promise) to your queue. You need to pass something that actually captures your intent so that it can be invoked later. There are lots of ways of doing this. You could pass arrays, in the form [func, arg1, arg2, ...], or you could pass functions in the form async () => { await func(arg1, arg2, ...) }, or you could use currying to produce a new function with predefined arguments, etc. Regardless...

Second: You don't do anything with task in your queue's handler. It's your job to do something, the queue just passes in the items you've given it. If you want the items to be functions, you have to invoke something, and await the promise it returns, if that's what you want the queue to do for you. Right now, you're ignoring task entirely.

Something like this:

var async  = require('async');

const timeout = ms => new Promise(resolve => setTimeout(resolve, ms));

var serialQueue = async.queue(async function(task, callback) {
    await task(); // Actually invoke the task
    console.log(`done with tasks...`);
    callback();
}, 1);

async function fx(msg, waitTime) {
    console.log(`START FX: ${msg}. waitTime: ${waitTime}`);
    await timeout(waitTime);
    console.log(`DONE FX: ${msg}`);
}

async function test() {
    // Don't invoke the function, instead pass a thing that can be invoked later, in the queue's handler function
    serialQueue.push(async () => { await fx("1", 10000) });
    serialQueue.push(async () => { await fx("2", 2000) });
    serialQueue.push(async () => { await fx("3", 1000) });
    await serialQueue.drain();
    console.log(`DRAINED!!!`);
}

test();

Output:

START FX: 1. waitTime: 10000
DONE FX: 1
done with tasks...
START FX: 2. waitTime: 2000
DONE FX: 2
done with tasks...
START FX: 3. waitTime: 1000
DONE FX: 3
done with tasks...
DRAINED!!!

CodePudding user response:

It looks to me that when you do

serialQueue.push(fx("1", 10000));

you're not queuing the execution of fx, but rather you're executing it right away and passing the returned value to serialQueue.push. I don't know the async library but it doesn't look like the thing you want to achieve.

Try changing it into something like

serialQueue.push(() => fx("1", 10000));

In that way you're passing a function, that will be queued and executed later, in order.

CodePudding user response:

The problem is that the worker function that one provides when creating an async.queue() is itself responsible for executing the task given it. And task in this context is indeterminate: it's nought but an object of some sort, the contract of which is left to you do define.

What you are doing each time you enqueue as task is invoking your async function which then starts executing immediately. Enqueuing them doesn't do anything.

This should do what you want to do:

const { queue } = require('async');

const timeout = ms => new Promise(resolve => setTimeout(resolve, ms));

const NOOP = () => {}

const worker = async (task, cb = () => {}) => {
  const {
    name: taskName,
    fn: work = NOOP,
  } = task;
  let status;

  console.log(`Worker: task ${taskName} received. execution begins...`);
  await work()
    .then( () => {
      status = "success";
      cb();
    })
    .catch( err => {
      status = "failure"
      cb(err);
    })
    .finally( () => {
      console.log(`Worker: task ${task.name} has completed with with ${status}`);
    });

};

const serialQueue = queue( worker, 1);

const createTask = (name, waitTime) => {
  return {
    name,
    async fn() {
      console.log(`task ${name}: started`);
      console.log(`task ${name}: waiting ${waitTime}ms...`);
      await timeout(waitTime);
      console.log(`task ${name}: ...wait complete.`);
      console.log(`task ${name}: finished`);
    }
  };
}

async function test() {
  serialQueue.push( createTask( '1', 10000 ) );
  serialQueue.push( createTask( '2',  2000 ) );
  serialQueue.push( createTask( '3',  1000 ) );
  await serialQueue.drain();
  console.log(`DRAINED!!!`);
}

test()
  .then(() => console.log('exited test'))
  .catch(err => console.log('Error!', err));

Executing the above code yields what you might expect:

Worker: task 1 received. execution begins...
task 1: started
task 1: waiting 10000ms...
task 1: ...wait complete.
task 1: finished
Worker: task 1 has completed with with success
Worker: task 2 received. execution begins...
task 2: started
task 2: waiting 2000ms...
task 2: ...wait complete.
task 2: finished
Worker: task 2 has completed with with success
Worker: task 3 received. execution begins...
task 3: started
task 3: waiting 1000ms...
task 3: ...wait complete.
task 3: finished
Worker: task 3 has completed with with success
DRAINED!!!
exited test
  • Related