Need to convert a DB table to a csv report.
If I immediately unload the entire tablet with one query then the application crashes because the memory runs out. I decided to query data from the table in portions of 100 rows, convert each row into a line of the report and write it into a stream that is piped with an express response.
All this happens nearly like this:
DB query
const select100Users = (maxUserCreationDateStr) => { return db.query(` SELECT * FROM users WHERE created_at < to_timestamp(${maxUserCreationDateStr}) ORDER BY created_at DESC LIMIT 100`); }
stream initialisation
const { PassThrough } = require('stream'); const getUserReportStream = () => { const stream = new PassThrough(); writeUserReport(stream).catch((e) => stream.emit('error', e)); return stream; };
piping the stream with an express response
app.get('/report', (req, res) => { const stream = getUserReportStream(); res.setHeader('Content-Type', 'application/vnd.ms-excel'); res.setHeader(`Content-Disposition', 'attachment; filename="${ filename }"`); stream.pipe(res); });
and finally how do I write data to the stream
const writeUserReport(stream) => { let maxUserCreationDateGlobal = Math.trunc(Date.now() / 1000); let flag = true; stream.write(USER_REPORT_HEADER); while (flag) { const rows100 = await select100Users(maxUserCreationDateGlobal); console.log(rows100.length); if (rows100.length === 0) { flag = false; } else { let maxUserCreationDate = maxUserCreationDateGlobal; const users100 = await Promise.all( rows100.map((r) => { const created_at = r.created_at; const createdAt = new Date(created_at); if (created_at && createdAt.toString() !== 'Invalid Date') { const createdAtNumber = Math.trunc(createdAt.valueOf() / 1000); maxUserCreationDate = Math.min(maxUserCreationDate, createdAtNumber); } return mapUser(r); // returns a promise }) ); users100.forEach((u) => stream.write(generateCsvRowFromUser(u))); maxUserCreationDateGlobal = maxUserCreationDate; if (rows100.length < 100) { flag = false; console.log('***'); } } } console.log('end'); stream.end(); };
as a result I see this output in the console:
100 // 100
100 // 200
100 // 300
100 // 400
100 // 500
87 // 587
***
end
But in the downloaded file I get 401 lines (the first one with USER_REPORT_HEADER). It feels like stream.end()
closes the stream before all values are read from it.
I tried using BehaviorSubject from rxjs instead of PassThrough in a similar way - the result is the same..
How can I wait for reading from the stream of all the data that I wrote there?
Or maybe someone can recommend an alternative way to solve this problem.
CodePudding user response:
stream.write
expects you to pass a callback as a second (or third parameter), to know when the write operation did finish. You can't call write again unless the previous write operation is finished.
So in general I'd suggest to make this whole function async and every time you call stream.write
you wrap it into a Promise like
await new Promise((resolve, reject) => stream.write(data, (error) => {
if (error) {
reject(error);
return;
}
resolve();
});
Obviously it would make sense to extract this to some method.
EDIT: Additionally I don't think that's the actual problem. I assume your http connection is just timing out before all the fetching is completed, so the server will eventually close the stream once the timeout deadline is met.