Home > Net >  nodes async/await streaming read issue(s)
nodes async/await streaming read issue(s)

Time:06-14

I'm using Node.js and would like to process a file as a stream, record by record. The input file is a huge CSV which I'm parsing using PapaParse. Depending on the values in each record, I may generate an output file for that record.

So - single big file in, thousands of small files out.

When I attempt to run through multiple input files, it only works on the 1st file. The additional files seem to fall though the code without actually reading the input file.

I suspect this is something silly, but I've been staring at the code and it's not jumping out at me. I'm running node v18.3.0.

Here is a link to a code sandbox that illustrates the problem. CODE SANDBOX

index.js

import { generateFiles } from "./processFile.js";

(async () => {
  const infiles = ["mockdata.csv", "mockdata222.csv", "mockdata333.csv"];
  const outdirs = ["111", "222", "333"];
  for (let i = 0; i < infiles.length; i  ) {
    console.log(`Process file ${infiles[i]} - start`);
    await generateFiles(infiles[i], `out/${outdirs[i]}`);
    console.log(`Process file ${infiles[i]} - end`);
  }
})();

console.log("*** Reached end of program ***");

processfiles.js

import Papa from "papaparse";
import fs from "fs";
import path from "path";
import { Transform } from "stream";

const CSVParse = Papa.parse(Papa.NODE_STREAM_INPUT, {
  header: true,
  beforeFirstChunk: function (chunk) {
    return chunk.replace(/^\uFEFF/gm, "").replace(/^\u00BB\u00BF/gm, "");
  }
});

const rowToPage = (outdir) => {
  let sequence = 0;
  return new Transform({
    objectMode: true,
    async transform(item, encoding, callback) {
      const seq = (  sequence).toString();
      if (sequence !==  item.id) {
        console.log("error for item", item);
        throw new Error(`${seq} != ${item.id}`);
      }
      const outfile = path.join(outdir, `${seq.padStart(7, "0")}`);
      console.log("outfile", outfile);

      // sleep instead of fs.writeFileSyunc(outfile, rowdata)
      await new Promise((r) => setTimeout(r, 10));

      this.push(item);
      callback();
    }
  });
};

export async function generateFiles(inFile, outDir) {
  console.log("--- start generating files ---");
  console.log("inFile", inFile);
  console.log("outDir", outDir);
  try {
    const rs = fs.createReadStream(inFile, "utf8");
    const processPromise = new Promise((resolve, reject) => {
      rs.pipe(CSVParse)
        .pause()
        .pipe(rowToPage(outDir))
        .resume()
        .on("data", (record) => {
          //console.log("record", record);
        })
        .on("end", () => {
          console.log("read stream done", inFile);
          resolve("finished reading");
        })
        .on("error", (error) => {
          console.log("read stream error", error);
          reject();
        });
    });
    console.log("before readstream await");
    await processPromise;
    console.log("after readstream await");
  } catch (error) {
    console.log("process file error", error);
  }
  console.log("--- finished generating files ---");
}

Here's the output:

sandbox@sse-sandbox-f4qdrq:/sandbox$ node .
Process file mockdata.csv - start
--- start generating files ---
inFile mockdata.csv
outDir out/111
before readstream await
*** Reached end of program ***
outfile out/111/0000001
outfile out/111/0000002
outfile out/111/0000003
outfile out/111/0000004
outfile out/111/0000005
outfile out/111/0000006
read stream done mockdata.csv
after readstream await
--- finished generating files ---
Process file mockdata.csv - end
Process file mockdata222.csv - start
--- start generating files ---
inFile mockdata222.csv
outDir out/222
before readstream await
read stream done mockdata222.csv
after readstream await
--- finished generating files ---
Process file mockdata222.csv - end
Process file mockdata333.csv - start
--- start generating files ---
inFile mockdata333.csv
outDir out/333
before readstream await
read stream done mockdata333.csv
after readstream await
--- finished generating files ---
Process file mockdata333.csv - end
sandbox@sse-sandbox-f4qdrq:/sandbox$

CodePudding user response:

You only ever create a single output stream:

const CSVParse = Papa.parse(Papa.NODE_STREAM_INPUT, { ... });

Which gets destroyed after the first file has been read. You need to create a new stream per file:

const CSVParse = () => Papa.parse(...);
...
rs.pipe(CSVParse())
  • Related