I'm using Node.js and would like to process a file as a stream, record by record. The input file is a huge CSV which I'm parsing using PapaParse. Depending on the values in each record, I may generate an output file for that record.
So - single big file in, thousands of small files out.
When I attempt to run through multiple input files, it only works on the 1st file. The additional files seem to fall though the code without actually reading the input file.
I suspect this is something silly, but I've been staring at the code and it's not jumping out at me. I'm running node v18.3.0.
Here is a link to a code sandbox that illustrates the problem. CODE SANDBOX
index.js
import { generateFiles } from "./processFile.js";
(async () => {
const infiles = ["mockdata.csv", "mockdata222.csv", "mockdata333.csv"];
const outdirs = ["111", "222", "333"];
for (let i = 0; i < infiles.length; i ) {
console.log(`Process file ${infiles[i]} - start`);
await generateFiles(infiles[i], `out/${outdirs[i]}`);
console.log(`Process file ${infiles[i]} - end`);
}
})();
console.log("*** Reached end of program ***");
processfiles.js
import Papa from "papaparse";
import fs from "fs";
import path from "path";
import { Transform } from "stream";
const CSVParse = Papa.parse(Papa.NODE_STREAM_INPUT, {
header: true,
beforeFirstChunk: function (chunk) {
return chunk.replace(/^\uFEFF/gm, "").replace(/^\u00BB\u00BF/gm, "");
}
});
const rowToPage = (outdir) => {
let sequence = 0;
return new Transform({
objectMode: true,
async transform(item, encoding, callback) {
const seq = ( sequence).toString();
if (sequence !== item.id) {
console.log("error for item", item);
throw new Error(`${seq} != ${item.id}`);
}
const outfile = path.join(outdir, `${seq.padStart(7, "0")}`);
console.log("outfile", outfile);
// sleep instead of fs.writeFileSyunc(outfile, rowdata)
await new Promise((r) => setTimeout(r, 10));
this.push(item);
callback();
}
});
};
export async function generateFiles(inFile, outDir) {
console.log("--- start generating files ---");
console.log("inFile", inFile);
console.log("outDir", outDir);
try {
const rs = fs.createReadStream(inFile, "utf8");
const processPromise = new Promise((resolve, reject) => {
rs.pipe(CSVParse)
.pause()
.pipe(rowToPage(outDir))
.resume()
.on("data", (record) => {
//console.log("record", record);
})
.on("end", () => {
console.log("read stream done", inFile);
resolve("finished reading");
})
.on("error", (error) => {
console.log("read stream error", error);
reject();
});
});
console.log("before readstream await");
await processPromise;
console.log("after readstream await");
} catch (error) {
console.log("process file error", error);
}
console.log("--- finished generating files ---");
}
Here's the output:
sandbox@sse-sandbox-f4qdrq:/sandbox$ node .
Process file mockdata.csv - start
--- start generating files ---
inFile mockdata.csv
outDir out/111
before readstream await
*** Reached end of program ***
outfile out/111/0000001
outfile out/111/0000002
outfile out/111/0000003
outfile out/111/0000004
outfile out/111/0000005
outfile out/111/0000006
read stream done mockdata.csv
after readstream await
--- finished generating files ---
Process file mockdata.csv - end
Process file mockdata222.csv - start
--- start generating files ---
inFile mockdata222.csv
outDir out/222
before readstream await
read stream done mockdata222.csv
after readstream await
--- finished generating files ---
Process file mockdata222.csv - end
Process file mockdata333.csv - start
--- start generating files ---
inFile mockdata333.csv
outDir out/333
before readstream await
read stream done mockdata333.csv
after readstream await
--- finished generating files ---
Process file mockdata333.csv - end
sandbox@sse-sandbox-f4qdrq:/sandbox$
CodePudding user response:
You only ever create a single output stream:
const CSVParse = Papa.parse(Papa.NODE_STREAM_INPUT, { ... });
Which gets destroyed after the first file has been read. You need to create a new stream per file:
const CSVParse = () => Papa.parse(...);
...
rs.pipe(CSVParse())