Home > Net >  Read and modify data sequentially in node.js
Read and modify data sequentially in node.js

Time:04-30

I'm trying to read and modify some csv files. For one csv file, the code I wrote is the following:

const csv = require('csv-parser');
const fs = require('fs');
const results = [];
const resultsFiltered = [];

fs.createReadStream('csvTest/2005.csv')
    .pipe(csv())
    .on('data', (data) => results.push(data))
    .on('end', () => {
        // Filter results
        for (i=0; i<results.length; i  ) {
            if (results[i]['Points:2'] == 0) {
                resultsFiltered.push([results[i]['Points:0'], results[i]['Points:1'], results[i]['displacement:2']]); 
            }  
        } 
        console.log('results filtered: ', resultsFiltered);    
    });

This works fine, but when I try to loop over several files, I get strange results. Here is the code:

const csv = require('csv-parser');
const fs = require('fs');

const dataFolder = 'csvTest/';

let results, resultsFiltered, stringified;

function filterData(_file) {
    results = [];
    resultsFiltered = [];
    console.log('filtering');

    fs.createReadStream(dataFolder   _file)
    .pipe(csv())
    .on('data', (data) => results.push(data))
    .on('end', () => {
        // Filter results
        for (i=0; i<results.length; i  ) {
            if (results[i]['Points:2'] == 0) {
                resultsFiltered.push([results[i]['Points:0'], results[i]['Points:1'], results[i]['displacement:2']]); 
            }  
        } 
        console.log('done');
        return resultsFiltered;
    });
}

const filesList = fs.readdirSync(dataFolder);

function main() {
    for (i=0; i<filesList.length; i  ) {
        console.log(filterData(filesList[i]));
    }
}

main();

I understand that it could be solved with async/await but all the ways that I tried using it were unsuccessful. I always get in the console the same output as the basic code above:

filtering
undefined
filtering
undefined
filtering
undefined
done
done
done

instead of the desired

filtering
done
resultsFiltered
filtering
done
resultsFiltered
filtering
done
resultsFiltered

How should async/await be used in this case?

CodePudding user response:

You get undefined because your function has no return statement. The return statement you use is actually in an callback function if you take a closer look. What you can do is to promisfy your function and use async await.

const csv = require("csv-parser");
const fs = require("fs");

const dataFolder = "csvTest/";

let results, resultsFiltered, stringified;

function filterData(_file) {
  return new Promise(res => {
    results = [];
    resultsFiltered = [];
    console.log("filtering");

    fs.createReadStream(dataFolder   _file)
      .pipe(csv())
      .on("data", data => results.push(data))
      .on("end", () => {
        // Filter results
        for (i = 0; i < results.length; i  ) {
          if (results[i]["Points:2"] == 0) {
            resultsFiltered.push([
              results[i]["Points:0"],
              results[i]["Points:1"],
              results[i]["displacement:2"]
            ]);
          }
        }
        console.log("done");
        res(resultsFiltered);
      });
  });
}

const filesList = fs.readdirSync(dataFolder);

async function main() {
  for (const file of filesList) {
    let result = await filterData(file);
    console.log(result);
  }
}

main();

Well, this should work.

CodePudding user response:

You will need to convert your operations to return a promise to be able to await the results

P.S don't forget to handle errors by listening for the error event and rejecting the promise.

function filterData(_file) {
  results = [];
  resultsFiltered = [];
  console.log('filtering');

  return new Promise((resolve) => {
    fs.createReadStream(dataFolder   _file)
      .pipe(csv())
      .on('data', (data) => results.push(data))
      .on('end', () => {
        // Filter results
        for (i = 0; i < results.length; i  ) {
          if (results[i]['Points:2'] == 0) {
            resultsFiltered.push([
              results[i]['Points:0'],
              results[i]['Points:1'],
              results[i]['displacement:2'],
            ]);
          }
        }
        console.log('done');
        resolve(resultsFiltered)
      });
  });
}

const filesList = fs.readdirSync(dataFolder);

async function main() {
  for (i = 0; i < filesList.length; i  ) {
    console.log(await filterData(filesList[i]));
  }
}

CodePudding user response:

There are multiple problems in this code that can lead to problems.

The main logic issue is that these streams are all asynchronous and your code is not keeping track of when any of these streams are done reading. Your return resultsFiltered; is inside the .on('end', ...) callback so you're not returning anything from your filterData() function (thus why you get undefined), but are just returning from that callback which goes nowhere.

Thus, when you loop over these streams, you have lots of them going at once and have no idea when they are all done and no way to get all the data out of them.

You also have issues with undeclared local variables which will cause them to be accidental globals and they can conflict with each other.

To solve the main logic issue, this is a useful spot to introduce a promise that will keep track of when each stream is done and let you get the data out as the resolved value of the promise or get an error out as the reject reason:

function filterData(_file) {
    const results = [];
    const resultsFiltered = [];

    return new Promise((resolve, reject) => {
        fs.createReadStream(dataFolder   _file)
            .pipe(csv())
            .on('data', (data) => results.push(data))
            .on('end', () => {
                // Filter results
                for (let i = 0; i < results.length; i  ) {
                    if (results[i]['Points:2'] == 0) {
                        resultsFiltered.push([results[i]['Points:0'], results[i]['Points:1'], results[i]['displacement:2']]);
                    }
                }
                resolve(resultsFiltered);
            }).on('error', reject);
    });
}

Then, you can use that promise to keep track of things in your loop:

const filesList = fs.readdirSync(dataFolder);

async function main() {
    for (let i = 0; i < filesList.length; i  ) {
        const result = await filterData(filesList[i]);
        console.log(result);
    }
}

main().then(() => {
    console.log("done");
}).catch(err => {
    console.log(err);
});

Note, this has several other fixes to your code:

  1. It adds an error event handler on your stream which rejects the promise and thus gives you an opportunity to track and handle errors. Your original code was ignoring errors.

  2. This adds local declarations with const or let for all your variables which is absolutely required. Your code was not declaring either i variable used in your loops which will cause them to be implicit variables and potentially conflict with one another.

  3. This sequences the streams so they aren't all in progress at once and it allows you to keep your results in order. The code would also be written to run in parallel (if enough resources are available to do that) by collecting all the promises and using Promise.all() to track them.

  • Related