Home > database >  Creating a Readable stream from emitted data chunks
Creating a Readable stream from emitted data chunks

Time:12-04

Short backstory: I am trying to create a Readable stream based on data chunks that are emitted back to my server from the client side with WebSockets. Here's a class I've created to "simulate" that behavior:

class DataEmitter extends EventEmitter {
    constructor() {
        super();

        const data = ['foo', 'bar', 'baz', 'hello', 'world', 'abc', '123'];
        // Every second, emit an event with a chunk of data
        const interval = setInterval(() => {
            this.emit('chunk', data.splice(0, 1)[0]);

            // Once there are no more items, emit an event
            // notifying that that is the case
            if (!data.length) {
                this.emit('done');
                clearInterval(interval);
            }
        }, 1e3);
    }
}

In this post, the dataEmitter in question will have been created like this.

// Our data is being emitted through events in chunks from some place.
// This is just to simulate that. We cannot change the flow - only listen
// for the events and do something with the chunks.
const dataEmitter = new DataEmitter();

Right, so I initially tried this:

const readable = new Readable();

dataEmitter.on('chunk', (data) => {
    readable.push(data);
});

dataEmitter.once('done', () => {
    readable.push(null);
});

But that results in this error:

Error [ERR_METHOD_NOT_IMPLEMENTED]: The _read() method is not implemented

So I did this, implementing read() as an empty function:

const readable = new Readable({
    read() {},
});

dataEmitter.on('chunk', (data) => {
    readable.push(data);
});

dataEmitter.once('done', () => {
    readable.push(null);
});

And it works when piping into a write stream, or sending the stream to my test API server. The resulting .txt file looks exactly as it should:

foobarbazhelloworldabc123

However, I feel like there's something quite wrong and hacky with my solution. I attempted to put the listener registration logic (.on('chunk', ...) and .once('done', ...)) within the read() implementation; however, read() seems to get called multiple times, and that results in the listeners being registered multiple times.

The Node.js documentation says this about the _read() method:

When readable._read() is called, if data is available from the resource, the implementation should begin pushing that data into the read queue using the this.push(dataChunk) method. _read() will be called again after each call to this.push(dataChunk) once the stream is ready to accept more data. _read() may continue reading from the resource and pushing data until readable.push() returns false. Only when _read() is called again after it has stopped should it resume pushing additional data into the queue.

After dissecting this, it seems that the consumer of the stream calls upon .read() when it's ready to read more data. And when it is called, data should be pushed into the stream. But, if it is not called, the stream should not have data pushed into it until the method is called again (???). So wait, does the consumer call .read() when it is ready for more data, or does it call it after each time .push() is called? Or both?? The docs seem to contradict themselves.

Implementing .read() on Readable is straightforward when you've got a basic resource to stream, but what would be the proper way of implementing it in this case?

And also, would someone be able to explain in better terms what the .read() method is on a deeper level, and how it should be implemented?

Thanks!

Response to the answer:

I did try registering the listeners within the read() implementation, but because it is called multiple times by the consumer, it registers the listeners multiple times.

Observing this code:

const readable = new Readable({
    read() {
        console.log('called');

        dataEmitter.on('chunk', (data) => {
            readable.push(data);
        });

        dataEmitter.once('done', () => {
            readable.push(null);
        });
    },
});

readable.pipe(createWriteStream('./data.txt'));

The resulting file looks like this:

foobarbarbazbazbazhellohellohellohelloworldworldworldworldworldabcabcabcabcabcabc123123123123123123123

Which makes sense, because the listeners are being registered multiple times.

CodePudding user response:

In general, the _read() method of a Readable stream is used to implement the logic for how the stream should read data from its source. When a consumer of the stream calls the read() method, the _read() method will be called to provide data to the stream. This data will be pushed into the stream's buffer, where it can be read by the consumer.

In your specific case, you are using events to provide data to the stream, rather than a traditional data source like a file or network connection. In this case, you can still use the _read method to implement the logic for reading data from the events, but you will need to register event listeners within the _read() method to ensure that they are only registered once, rather than multiple times.

Here is an example of how you might implement the _read() method in your DataEmitter stream:

const readable = new Readable({
  _read() {
    // Register event listeners for the 'chunk' and 'done' events
    dataEmitter.on('chunk', (data) => {
      readable.push(data);
    });
    dataEmitter.once('done', () => {
      readable.push(null);
    });
  },
});

In this example, the _read() method is called whenever the stream's consumer calls the read() method. This ensures that the event listeners are only registered once, and that data from the 'chunk' events is pushed into the stream as it is received.

As for the deeper meaning of the read() method on a Readable stream, this method is used by the consumer of the stream to request data from the stream. When the consumer calls read(), the _read() method is called to provide data to the stream, which is then pushed into the stream's buffer. The consumer can then access this data by reading from the stream. This allows the consumer to control the flow of data from the stream, and ensure that it only receives data when it is ready to process it.

Update:

In your code, you are creating a readable stream using the Readable class, but you are not implementing the _read() method. This is why you are seeing the error Error [ERR_METHOD_NOT_IMPLEMENTED]: The _read() method is not implemented.

To fix this error, you can implement the _read() method in your Readable class. The _read() method should be responsible for pushing data into the stream when the consumer is ready to receive it. In your case, you can register listeners for the 'chunk' and 'done' events on the dataEmitter within the _read() method. When the 'chunk' event is emitted, you can push the data into the stream using the this.push() method. When the 'done' event is emitted, you can call this.push(null) to signal the end of the stream.

Here is an example of how you can implement the _read() method in your Readable class:

class DataEmitter extends EventEmitter {
  constructor() {
    super();

    const data = ["foo", "bar", "baz", "hello", "world", "abc", "123"];
    // Every second, emit an event with a chunk of data
    const interval = setInterval(() => {
      this.emit("chunk", data.splice(0, 1)[0]);

      // Once there are no more items, emit an event
      // notifying that that is the case
      if (!data.length) {
        this.emit("done");
        clearInterval(interval);
      }
    }, 1e3);
  }
}

class MyReadable extends Readable {
  constructor(dataEmitter) {
    super();
    this.dataEmitter = dataEmitter;
  }

  _read() {
    this.dataEmitter.on("chunk", (data) => {
      this.push(data);
    });

    this.dataEmitter.once("done", () => {
      this.push(null);
    });
  }
}

const dataEmitter = new DataEmitter();
const myReadable = new MyReadable(dataEmitter);

// Pipe the readable stream into a write stream
myReadable.pipe(createWriteStream(...));

The _read() method is called by the consumer of the stream when it is ready to receive more data. It is the responsibility of the _read() method to push data into the stream using the this.push() method. When the _read() method is called again after it has stopped pushing data, it should resume pushing data into the stream.

Hope this helps!

CodePudding user response:

Seems like the only purpose of actually implementing the read() method is to only start receiving the chunks and pushing them into the stream when the consumer is ready for that.

Based on these conclusions, I've come up with this solution.

class MyReadable extends Readable {
    // Keep track of whether or not the listeners have already
    // been added to the data emitter.
    #registered = false;

    _read() {
        // If the listeners have already been registered, do
        // absolutely nothing.
        if (this.#registered) return;

        // "Notify" the client via websockets that we're ready
        // to start streaming the data chunks.
        const emitter = new DataEmitter();

        const handler = (chunk: string) => {
            this.push(chunk);
        };

        emitter.on('chunk', handler);

        emitter.once('done', () => {
            this.push(null);
            // Clean up the listener once it's done (this is
            // assuming the #emitter object will still be used
            // in the future).
            emitter.off('chunk', handler);
        });

        // Mark the listeners as registered.
        this.#registered = true;
    }
}

const readable = new MyReadable();

readable.pipe(createWriteStream('./data.txt'));

But this implementation doesn't allow for the consumer to control when things are pushed. I guess, however, in order to achieve that sort of control, you'd need to communicate with the resource emitting the chunks to tell it to stop until the read() method is called again.

  • Related