Right now I am trying to read file with lots of lines, and I need to insert each of the line into the database in a stream manner. Is this possible?
This is what I have currently,
async function insertToDb(data) {
await repository.save(new Entity("id", "name", "address"));
}
return new Promise((resolve) => {
Readable.from(buffer)
.on('data', async (data) => await insertToDb(data))
.on('end', () => {
resolve(1);
});
});
So the code above did read all the lines in the file, but the asynchronous database operations were executed AFTER the Promise.resolve(1)
. How to ensure that all database operations have finished before I resolve the promise?
CodePudding user response:
The Promise returned by the data handler is going unused. You'll need to use them all somehow - such as by pushing to an array outside the callback, then using Promise.all
inside the end
handler.
return new Promise((resolve) => {
const dbPromises = [];
Readable.from(buffer)
.on('data', (data) => {
dbPromises.push(insertToDb(data));
})
.on('end', () => {
Promise.all(dbPromises).then(() => resolve(1));
});
});
You also might consider adding an error handler that rejects the Promise, so that the caller can see that something went wrong if something does go wrong.
CodePudding user response:
If you MUST wait for the stream to end before calling resolve()
, but don't want to store all the promises into an array for Promise.all
and break the purpose of streams, there's two ways I would go about it.
Method 1 (efficient but risky)
return new Promise((resolve) => {
let dbPromise;
Readable.from(buffer)
.on('data', (data) => {
dbPromise = insertToDb(data);
})
.on('end', () => {
dbPromise.then(() => resolve(1));
});
});
This keeps continously updating dbPromise
up until the last line of the file. After which, on('end')
is called and awaits dbPromise
(which will be the last line in the file).
But this comes with a risk. Let's suppose, for whatever reason, the last HTTP request finishes before the second-to-last request (this happened to me many times). The function would've already been resolved by the time the last request finishes. Now, this may or may not be something worth worrying about (depending on your case), but the risk is there.
Method 2 (not as performant)
return new Promise((resolve) => {
let unfinishedPromises = [];
Readable.from(buffer)
.on('data', (data) => {
const dbPromise = insertToDb(data);
unfinishedPromises.push(dbPromise);
dbPromise.then(() => {
// Remove dbPromise from unfinishedPromises when dbPromise completes.
unfinishedPromises.splice(unfinishedPromises.indexOf(dbPromise), 1);
});
})
.on('end', () => {
Promise.all(unfinishedPromises).then(() => resolve(1));
});
});
Pretty self-explanatory. Promises are added to unfinishedPromises
, and if they complete before on('end')
is called, they're removed from the array. When on('end')
finally runs, it Promise.all
any remaining unfinishedPromises.
This elimantes the risk in Method #1, but the "Stream approach" would be broken. This is countered however in that size of unfinishedPromises would be kept at a minimum.
Hope this helps you :). Stay coding.