Home > database >  Implementing a queue of function calls that runs in parallel to the rest of the code
Implementing a queue of function calls that runs in parallel to the rest of the code

Time:11-15

I'm currently building a program in JavaScript that is making requests of the google sheets API based on activity occurring in a Discord Server (messaging app). However, I've been running into the API RateLimits in cases where multiple users do the same action at the same time, causing too many API Requests in too short of a time.

My idea to get around this is to implement a parallel queue of async function calls, so that whenever I want to make a request of the google API, I queue that function call, and another function or thread or something will keep checking this queue and if there is a function available, it will run that function, wait a little bit, and then check the queue again, and so on.

I'm struggling to figure out how to do this in regular asynchronous (async/await) programming. I've been referring to the following posts/pages, but they all seem focused on a predefined queue that is then dequeued in order - I want to be able to keep adding to the queue even after the functions have started being run. How do I store javascript functions in a queue for them to be executed eventually Semaphore-like queue in javascript? https://www.codementor.io/@edafeadjekeemunotor/building-a-concurrent-promise-queue-with-javascript-1ano2eof0v

Any help or guidance would be very appreciated, thank you!

CodePudding user response:

The simplest option would be to have a queue of promise-returning functions and poll it periodically.

Example:

let queue = []

async function poll() {
    console.log('POLL, queue.length=', queue.length)
    if (queue.length) {
        let result = await queue.shift()()
        console.log('RESULT', result.id)
    }
    setTimeout(poll, 1000)
}

let n = 0

function test() {
    let getter = () => fetch(
        'https://jsonplaceholder.typicode.com/todos/'   (  n) 
    ).then(r => r.json())

    queue.push(getter)
    
}


poll()
<button onclick="test()">click many times</button>

CodePudding user response:

Besides what T.J. Crowder already did mention about true parallelism in JavaScript there is this special requirement of wanting to continuously being able to add to the queue (enqueue) after the (de)queue based processing had started. Therefore I doubt there will be any solution based entirely on promises.

Thus, in case one does not want to go for permanently running "background" tasks based on setInterval/setTimeout, one has to implement an approach capable of handling callbacks.

One way was to e.g. implement a request class which is capable of dispatching its own (custom) events. It should be possible for both node and Web Api environments (browsers) since the latter provides/supports EventTarget and the former features packages for it.

// helper functions for logging and creating a list
// of api requests
function logRequestQueueEvent({ type, detail }) {
  console.log({ [ type ]: { detail } });
}
function createFetchArrayFromBoundApiCallCount() {
  let { callCount = 0, batchSize = 12 } = this;

  const result = Array
    .from({ length: batchSize })
    .map((_, idx) =>
      `https://jsonplaceholder.typicode.com/todos/${ idx   callCount   1 }`
    );
  this.callCount = callCount   batchSize;

  return result;
}

// initializing of the main example which uses an
// instance of a custom implemented request-queue
// class which is capable of both
//  - fetching continuously
//  - and dispatching events.
function main() {
  const requestQueue = new ContinuouslyFetchingRequestQueue(5);

  // a queue instance's only two event types one can subsribe to.
  requestQueue.addEventListener('new-fetch', logRequestQueueEvent);
  requestQueue.addEventListener('new-batch', logRequestQueueEvent);

  const createFetchArray = createFetchArrayFromBoundApiCallCount
    .bind({ callCount: 0, batchSize: 12 });

  document
    .querySelector('[data-request]')
    .addEventListener('click', () =>
      // a queue instance's sole public accessible method.
      requestQueue.fetch(createFetchArray())
    );
}
main();
body { zoom: .9; margin: 0; }
button { display: block; width: 5em; margin: 10px 0; }
.as-console-wrapper { min-height: 100%!important; width: 89%; top: 0; left: auto!important; }
<script>
// helper function for creating chunks from an array.
function chunkArray(arr = [], chunkLength = arr.length) {
  chunkLength = Math.abs(chunkLength);

  const result = [];
  while (arr.length >= 1) {

    result.push(
      arr.splice(0, chunkLength)
    );
  }
  return result;
}

// `queue` instance related request and response handler
// which got implemented as `this` context aware function.
function handleRequestsAndResponsesAtBoundQueue() {
  const { queue, array, requestsCount } = this;

  queue
    .dispatchEvent(
      new CustomEvent('new-fetch', {
        detail: {
          currentFetchQueue: [...array],
        },
      }),
    );

  // decouple a `queue`s bound `array` reference
  // from the to be started request process by
  // creating a shallow copy.
  const urlList = [...array];

  // reset/mutate a `queue`s bound `array` reference
  // to an empty array.
  array.length = 0;

  // create a list of chunked arrays of URLs.
  chunkArray(urlList, requestsCount)
    .forEach(chunkedUrlList =>

      // for each sub array create an all-settled promise ...
      Promise
        .allSettled(
          chunkedUrlList.map(url =>
            fetch(url).then(response => response.json())
          )
        )
        .then(resultArray => {

          responseData = resultArray
            // ... where each settled promise either features
            // a failing `reason` or the JSON-parsed `value`.
            .map(result => result.reason ?? result.value);

          // since a `queue` instance features inherited
          // `EventTarget` behavior, one can dispatch the
          // above mapped result array as a new batch'es
          // response data as part of an e.g. custom event.
          queue
            .dispatchEvent(
              new CustomEvent('new-batch', {
                detail: {
                  responseData,
                  currentFetchQueue: [...array],
                },
              }),
            );
        })
    );
}

// queue instance method implemented
// as `this` context aware function.
function addRequestsToBoundQueue(...urlList) {
  // assure a flat arguments array (to a certain degree).
  urlList = urlList.flat();

  // guard.
  if (urlList.length >= 1) {

    this.array.push(...urlList);

    // implement the request handling as non blocking.
    setTimeout(
      handleRequestsAndResponsesAtBoundQueue.bind(this), 0
    );
  }
}

// custom request-queue class which is capable of both
//  - fetching continuously
//  - and dispatching events.
class ContinuouslyFetchingRequestQueue extends EventTarget {
  constructor(requestsCount) {
    super();

    requestsCount = Math
      .max(1, Math.min(20, parseInt(requestsCount, 10)));

    const array = [];
    const queue = this

    // single/sole public accessible instance method.
    this.fetch = addRequestsToBoundQueue
      .bind({ queue, array, requestsCount });
  }
}
</script>

<button data-request>add 12 request</button>
<button onclick="console.clear();">clear console</button>

  • Related