Home > Back-end >  Batch request and process in parallel in Node with async iterators
Batch request and process in parallel in Node with async iterators

Time:09-17

I have a small function that gets sent log to it. In the function, I am trying to send the to AWS Cloudwatch however I have a few issues. Once one log has been sent I can not send another one until the first one has finished as I need the next sequenceToken so the next log knows where to add itself. I know that async iterators are the key here but not sure how to implement them in my code. Here is my current code which fails after the first log is sent:

const build = require('pino-abstract-stream');

const stream = async (options) => {
    // Creates the AWS connection
    const client = await createClient();
 
    // Gets the first token
    let sequenceToken = await getInitSequenceToken(client);

    return build(function (source) {
        source.on("data", async function (obj) {
            // Every time the log is sent to my worker thread it prints out here. Obj contains the info from the log
            const command = new PutLogEventsCommand({
                logGroupName: "api",
                logStreamName: `executive-${Config.env}-${Config.location}`,
                logEvents: [
                    {
                        message: obj.msg,
                        timestamp: obj.time,
                    },
                ],
                sequenceToken,
            });

            // Here I am sending the log to Clouwatch 
            const response = await client.send(command);
        
            // Here I was updating the token but this fails as the next log is already sending 
            sequenceToken = response.nextSequenceToken;
        });
    });
};

CodePudding user response:

You can wait for the stream(source) to finish, and store all objects into an array then send them with one request - seeing that logEvents accepts an array with multiple objects - that would probably be the better solution since you have timestamps on objects and you'll have them all grouped under one sequenceToken:

const build = require('pino-abstract-stream');

const stream = async(options) => {
  // Creates the AWS connection
  const client = await createClient();

  // Gets the first token
  let sequenceToken = await getInitSequenceToken(client);
  let storeStreamObjects = [];
    
  return build(function(source) {
    source.on("data", async function(obj) {
      storeStreamObjects.push({
        message: obj.msg,
        timestamp: obj.time,
      });
    });
    source.on("close", async () => {

      const command = new PutLogEventsCommand({
        logGroupName: "api",
        logStreamName: `executive-${Config.env}-${Config.location}`,
        logEvents: storeStreamObjects, // all objects are here
        sequenceToken,
      });

      return await client.send(command);

    })
  });
};

CodePudding user response:

If the logs can be sent later to the cloud, you may do something like this below.

const build = require("pino-abstract-stream");

const stream = async options => {
  // Creates the AWS connection
  const client = await createClient();

  // Gets the first token
  let sequenceToken = await getInitSequenceToken(client);

  const steamData = [];

  return build(function (source) {
    source.on("data", async function (obj) {
      steamData.push(obj);
    });

    source.on("end", async function () {
      // Every time the log is sent to my worker thread it prints out here. Obj contains the info from the log
      for (const data of steamData) {
        const command = new PutLogEventsCommand({
          logGroupName: "api",
          logStreamName: `executive-${Config.env}-${Config.location}`,
          logEvents: [
            {
              message: data.msg,
              timestamp: data.time,
            },
          ],
          sequenceToken,
        });

        // Here I am sending the log to Clouwatch
        const response = await client.send(command);

        sequenceToken = response.nextSequenceToken;
      }
    });
  });
};
  • Related