Home > OS >  Promise and work thread chunking not working
Promise and work thread chunking not working

Time:02-04

I have this in my code where i am using promise to limit at max MAX_WORKERS workgroup at a time.

 const MAX_WORKERS = 3;
 let promises = [];
 let chunk = 3;
  let totalNodes = keyCountMap.length / 2;
  let doneCount = 0;

  for (let m = 0; m < keyCountMap.length; ) {
    let remaining = totalNodes - doneCount;
    let numbWorker = Math.min(chunk, remaining);
    for (let i = 0; i < numbWorker; i  ) {
      promises.push(createWorker(keyCountMap[m], keyCountMap[m   1]));
      doneCount  ;
      m  = 2;
      console.log(doneCount);
      if (doneCount % MAX_WORKERS == 0) {
        Promise.all(promises).then((response) => {
          console.log("one chunk finish");
        });
      }
    }
  }

and createWorker function is:

function createWorker(data1, data2) {
  return new Promise((resolve) => {
    let worker = new Worker();
    worker.onmessage = (event) => {
      postMessageRes = event.data;
      if (postMessageRes == 200) {
        worker.postMessage([
          nodePagesString,
          pagesString,
          copcString,
          data1,
          data2,
        ]);
      } else {
        workerCount  = 1;
        let position = postMessageRes[0];
        let color = postMessageRes[1];
        for (let i = 0; i < position.length; i  ) {
          positions.push(position[i]);
          colors.push(colors[i]);
        }
        if (workerCount == MAX_WORKERS) {
          worker.terminate();
          workerCount = 0;
          promises = [];
        }
        resolve(true);
      }
    };
  });
}

but the problem i have here is that it is not working like i wanted "3 work thread at a time and when it is done, spawn more thread/workers"

This is my console

enter image description here

and

enter image description here

which mean it is called at once and not after 3 threads which is count of chunks

Can anyone help me please on this

This is my worker code:

import { Copc, Key } from "copc";
import * as THREE from "three";
const color = new THREE.Color();
const colors = [];
let firstTime = true;
var nodePages, pages, receivedData, copc;
let x_min, y_min, z_min, x_max, y_max, z_max, width;
let positions = [];
let filename = "https://s3.amazonaws.com/data.entwine.io/millsite.copc.laz";

const readPoints = (id, getters) => {
  let returnPoint = getXyzi(id, getters);
  positions.push(
    returnPoint[0] - x_min - 0.5 * width,
    returnPoint[1] - y_min - 0.5 * width,
    returnPoint[2] - z_min - 0.5 * width
  );
  const vx = (returnPoint[3] / 65535) * 255;
  color.setRGB(vx, vx, vx);
  colors.push(color.r, color.g, color.b);
  firstTime = false;
};

function getXyzi(index, getters) {
  return getters.map((get) => get(index));
}

async function load() {
  // copc = await Copc.create(filename);
  // let scale = copc.header.scale[0];
  // [x_min, y_min, z_min, x_max, y_max, z_max] = copc.info.cube;
  // width = Math.abs(x_max - x_min);
  // // let center_x = (x_min   x_max) / 2;
  // // let center_y = (y_min   y_max) / 2;
  // // let center_z = (z_min   z_max) / 2;
  // receivedData = await Copc.loadHierarchyPage(
  //   filename,
  //   copc.info.rootHierarchyPage
  // );
  // nodePages = receivedData.nodes;
  // pages = receivedData.pages;
  postMessage(200);
}

async function loadData(nodes, pages, copc, myRoot, pointCount) {
  // console.log(copc, myRoot);
  const view = await Copc.loadPointDataView(filename, copc, myRoot);
  let getters = ["X", "Y", "Z", "Intensity"].map(view.getter);
  for (let j = 0; j < pointCount; j  = 1) {
    readPoints(j, getters);
  }
  postMessage([positions, colors]);
}

load();

onmessage = function (message) {
  let nodePages = message.data[0];
  let nodes = JSON.parse(nodePages);
  let pagesStr = message.data[1];
  let pages = JSON.parse(pagesStr);
  let copcStr = message.data[2];
  let copc = JSON.parse(copcStr);

  let mapIndex = message.data[3];
  let pointCount = message.data[4];
  let myRoot = nodes[mapIndex];
  // console.log(mapIndex);
  loadData(nodes, pages, copc, myRoot, pointCount);
};

CodePudding user response:

What's creating the confusion here is the mix of synchronous and asynchronous processes. In your first code block there are two for loops that iterate over the keyCountMap.length and numbWorker, respectively. The inner for loop sets up an asynchronous callback once the array of promises are resolved. However, it does not wait for the result as it is asynchronous.

So, the behavior you're seeing is the looping over all values in the for loops and queueing of multiple async calls. You can read documentation here about how the event loop works.

The logging for the doneCount values happens first and then your async callbacks happen afterward. You should be able to verify this by changing the console.log within the Promise callback by adding the doneCount to it:

/*
The use of JSON.parse and JSON.stringify is to preserve the initial value.
Otherwise it will show the current value of doneCount when the callback finalizes.
*/
console.log(`one chunk finish: ${JSON.parse(JSON.stringify(doneCount))}`);

Edit Added recommended adjustment.

const chunkWorkers = async () => {
 const MAX_WORKERS = 3;
 let promises = [];
 let chunk = 3;
  let totalNodes = keyCountMap.length / 2;
  let doneCount = 0;

  for (let m = 0; m < keyCountMap.length; ) {
    let remaining = totalNodes - doneCount;
    let numbWorker = Math.min(chunk, remaining);
    for (let i = 0; i < numbWorker; i  ) {
      promises.push(createWorker(keyCountMap[m], keyCountMap[m   1]));
      doneCount  ;
      m  = 2;
      console.log(doneCount);
      if (doneCount % MAX_WORKERS == 0) {
        await Promise.all(promises).then((response) => {
          console.log(`one chunk finish: ${doneCount}`);
        });
      }
    }
  }
};

CodePudding user response:

Start off by creating the maximum number of workers, and then have each of them create a new one when they are finished.

It looks something like this:

const mockWorker = async (ix) => new Promise(resolve => setTimeout(() => resolve(ix), 1000))
const MAX_WORKERS = 3;

function runWorkers(chunks){
  const startWorker = (chunk) => mockWorker(chunk).then(res => {
      console.log(`finished chunk ${res}`)
      if(chunks > 0){
        return startWorker(chunks--)
      }
  })
  const promises = []
  for (let i = 0; i < MAX_WORKERS; i  ) {
    promises.push(startWorker(chunks--))
  }
  Promise.all(promises).then(() => console.log('All finished'))
}

runWorkers(29)

  • Related