as you can see i have a js that takes a .csv and calls an async function for every row (4 different functions iteratively).
The problem is that I need to wait the end of the function in the i-th iteration before I proceed to the i 1 iteration.
const csv = require('csv-parser');
const fs = require('fs');
var i=1;
fs.createReadStream('table.csv')
.pipe(csv())
.on('data', (row) => {
switch(i%4){
case 1: org1createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 2: org2createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 3: org3createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 0: org4createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
}
i ;
})
.on('end', () => {
console.log('CSV file successfully processed');
});
async function org1createPatient(patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
...
}
async function org2createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
...
}
async function org3createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
...
}
async function org4createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
...
}
How can I get what I want? Hope my question is clear enough!
CodePudding user response:
The readStream
you are using here is asynchronous, meaning .on(event, callback)
will trigger every time a new piece of data is read, independently of any callback
triggered. In other words, the execution of the callback
function here does not impact this process, it will be ran in parallel, every time event
received.
This means that in case callback
was to execute a piece of code that is asynchronous, you may very well end up in a situation where multiple instances of this function may still be running by the time the next read event
is received.
Note: this holds true for any event, including the
'end'
event.
If you were to use async/await
on callback
if would only make the internal logic of this function synchronous. It would still not impact the rate at which your data is read.
In order to do so you will want to use both async/await
on callback
(to make it internally synchronous) and have callback
manually pause and resume the read operation happening in parallel.
const csv = require('csv-parser');
const fs = require('fs');
let i = 1;
const stream = fs.createReadStream('table.csv').pipe(csv());
stream.on('data', async (row) => {
// pause overall stream until this row is processed
stream.pause();
// process row
switch (i%4){
case 1: await org1createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 2: await org2createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 3: await org3createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
case 0: await org4createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
}
i ;
// resume overall stream
stream.resume();
});
stream.on('end', () => {
// now guaranteed that no instances of `callback` is still running in parallel when this event is fired
console.log('CSV file successfully processed');
});
CodePudding user response:
Solution below is using iter-ops library, which is very efficient in this case, because pipe(csv())
returns an AsyncIterable
, so it should be processed accordingly.
Since you do not care about what those processing functions return, we can just throttle the processing for each row:
const {pipe, throttle, onEnd} = require('iter-ops');
const csv = require('csv-parser');
const fs = require('fs');
const asyncIterable = fs.createReadStream('table.csv').pipe(csv());
const i = pipe(
asyncIterable,
throttle(async (row, index) => {
switch (index % 4) {
case 1: await org1createPatient(row.patientId, ...); break;
case 2: await org2createPatient(row.patientId, ...); break;
case 3: await org3createPatient(row.patientId, ...); break;
case 0: await org4createPatient(row.patientId, ...); break;
default: break;
}
}),
onEnd(s => {
console.log(`Finished in ${s.duration}ms`);
})
);
async function processCSV() {
// this will trigger the iteration:
for await(const a of i) {
// iterating through entire CSV
}
}
P.S. I'm the author of iter-ops.