After creating a stream (A), creating another stream (B) and reading stream (B), the reading process stops from the stream (A). How can I solve this problem?
Node.js v14.18.1
import * as readline from 'readline';
import { Readable } from 'stream';
async function main() {
const streamA = Readable.from('a');
const readerA = readline.createInterface({
input: streamA,
crlfDelay: Infinity
});
var stopCase = false;
if (stopCase) {
const streamB = Readable.from('b');
const readerB = readline.createInterface({
input: streamB,
crlfDelay: Infinity
});
console.log('readB');
for await (const line of readerB) {
console.log(line);
}
}
console.log(`readerA.closed = ${'closed' in readerA}`);
console.log('readA');
for await (const line of readerA) {
console.log(line);
}
console.log('success');
}
main();
Output(stopCase=true):
readB
b
readerA.closed = true
readA
Output(stopCase=false):
readerA.closed = false
readA
a
success
CodePudding user response:
The issue is that as soon as you do this:
const readerA = readline.createInterface({
input: streamA,
crlfDelay: Infinity
});
Then, streamA
is now ready to flow and readerA
is ready to generate events as soon as you hit the event loop. When you go into the stopCase
block and hit the for await (const line of readerB)
, that will allow streamA
to flow which will allow readerA
to fire events.
But, you aren't listening for the readerA events when they fire and thus it finishes the streamA
content it had while you aren't listening.
You can see how it works better if you don't create readerA
until after you're done with the stopCase
block. Because then streamA
and readerA
aren't yet flowing when you hit the await
inside of the stopCase
block.
This is what I would call a growing pain caused by trying to add promises onto the event driven streams. If you leave the stream in a flowing state and you were going to use await
to read those events, but you then await
some other promise, all your events on that first stream fire when you aren't yet listening. It doesn't know you're waiting to use await
on it. You set it up to flow so as soon as the interpreter hits the event loop, it starts flowing, even though you aren't listening with await
.
I've run into this before in my own code and the solution is to not set a stream up to flow until you're either just about to use await
to read it or until you have a more traditional event handler configured to listen to any events that flow. Basically, you can't configure two streams for use with for await (...)
at the same time. Configure one stream, use it with your for await (...)
, then configure the other. And, be aware of any other promises used in your processing of the for await (...)
loop too. There are lots of ways to goof up when using that structure.
In my opinion, it would work more reliably if a stream was actually put in a different state to be used with promises so it will ONLY flow via the promise interface. Then, this kind of thing would not happen. But, I'm sure there are many challenges with that implementation too.
For example, if you do this:
import * as readline from 'readline';
import { Readable } from 'stream';
async function main() {
var stopCase = true;
console.log(`stopCase = ${stopCase}`);
if (stopCase) {
const streamB = Readable.from('b');
const readerB = readline.createInterface({
input: streamB,
crlfDelay: Infinity
});
console.log('readB');
for await (const line of readerB) {
console.log(line);
}
}
const streamA = Readable.from('a');
const readerA = readline.createInterface({
input: streamA,
crlfDelay: Infinity
});
console.log(`streamA flowing = ${streamA.readableFlowing}`);
console.log(`readerA.closed = ${!!readerA.closed}`);
console.log('readA');
for await (const line of readerA) {
console.log(line);
}
console.log('success');
}
main();
Then, you get all the output:
stopCase = true
readB
b
streamA flowing = true
readerA.closed = false
readA
a
success
The reason you never get the console.log('success')
is probably because you hit the for await (const line of readerA) { ...}
loop and it gets stopped there on a promise that has no more data. Meanwhile, nodejs notices that there is nothing left in the process that can create any future events so it exits the process.
You can see that same concept in play in an even simpler app:
async function main() {
await new Promise(resolve => {
// do nothing
});
console.log('success');
}
main();
It awaits a promise that never completes and there are no event creating things left in the app so nodejs just shuts down with ever logging success
.