I am still not clear on how the data flows between stream in pipe. Most examples are using file or built in generator as starting point, then finished in another file or output. But most of them didn't show anything about how to make our own pipeline.
So, I need to generate data I received from another data from an external machine, then transform it into something, and retransmit the data via UDP socket (no stream-udp
package please). I know how to get the data, and I know how to send data via datagram, but I am confused on how you can stream them as the data received and sent.
Let's say. I have a something like this:
import { pipeline, Readable, Transform, Writable } from "stream";
const reader = new Readable();
const transform = new Transform();
const writable = new Writable();
// How to flow the data from reader to transform to writer?
transform.on('data', (chunk) => {
console.log('Transforming ' chunk);
const newchunk = chunk 'transformed ';
// how to send the data from here to writable?
transform.push(newchunk);
})
writable.on('???', (chunk) => {
// write the chunk into something
console.log(chunk);
}
pipeline(
reader,
transform,
writable,
(err) => {
console.log('Done')
if (err) console.error(err);
}
)
reader.push('this is new data');
So, can someone please help me on how the things flow from the reader sending a text to transform, then transform, then received at writable?
A simple example should suffice, like reader.push('this is data')
and ends with writer.on('???', (chunk) => console.log(chunk))
Thank you.
CodePudding user response:
one of the problems with streams Api is there are a lot of whys to implement and the example are not consistent
here is a straightforward example :
CharStream - is a Readable Stream: generate a character stream from a-z then when it reaches z send null and finish
Uppercasify - a Transformer takes each letter and change it to uppercase
StringWritable - aggregate all letters to one string and on finish console log it
import { Transform, Readable, Writable } from 'readable-stream'
class CharStream extends Readable {
constructor(options = {}) {
super(options)
this.charCode = 97;
}
_read() {
this.push(`${String.fromCharCode(this.charCode )}\n`);
if (this.charCode > 'z'.charCodeAt(0)) this.push(null);
};
}
class Uppercasify extends Transform {
constructor(options = {}) {
super(options)
this.counter = 0
}
_transform(chunk, encoding, done) {
done(null, chunk.toString().toUpperCase())
}
}
class StringWritable extends Writable {
constructor(options = {}) {
super(options)
this.currentString = "";
}
_write(chunk, encoding, done) {
this.currentString = chunk.toString().replace("\n", "")
done()
}
}
const cs = new CharStream()
const uts = new Uppercasify()
const out = new StringWritable()
const stream = cs.pipe(uts).pipe(out)
stream.on('finish', () => console.log(out.currentString))