Home > Blockchain >  Proxy around a Writable stream cause `write after end` errors when piping data
Proxy around a Writable stream cause `write after end` errors when piping data

Time:06-27

I want to create a wrapper around a Writable stream. But when I try to pipe() some data through that proxy, I end up having write after end errors.

Here is a self-contained example showing my problem:

const http = require("http");
const fs = require("fs");
const { Writable, Readable } = require('stream');

class MyStream extends Readable {
  constructor() {
    super();

    this._iterator = this.generator();
  }

  *generator() {
    for(const c of "Hello")
      yield c;
  }

  _read(n) {
    const it = this._iterator.next();
    if (it.done)
      this.push(null);
    else
      this.push(it.value);
  }
}

class Proxy extends Writable {
  constructor(req) {
    super();
    this._node_req = req;
  }

  end(chunk, encoding, cb) {
    console.trace("end");
    this._node_req.end(chunk, encoding, cb);
    return {}; // return something
  }

  _write(chunk, encoding, cb) {
    console.log("_write", chunk.toString("utf8"));
    return this._node_req.write(chunk, encoding, cb);
  }
}

const src = new MyStream()
const req = http.request("http://httpbingo.org/post", { method: "POST" }, (res) => {
  res.pipe(process.stderr);
});
src.pipe(new Proxy(req));

Here is the dump of what I obtain when I run this program:

sh$ node --version
v14.17.1
sh$ node t.js
_write H
Trace: end
    at Proxy.end (/home/sylvain/Projects/getpro/t.js:33:13)
    at MyStream.onend (internal/streams/readable.js:665:10)
    at Object.onceWrapper (events.js:481:28)
    at MyStream.emit (events.js:375:28)
    at endReadableNT (internal/streams/readable.js:1317:12)
    at processTicksAndRejections (internal/process/task_queues.js:82:21)
_write e
events.js:352
      throw er; // Unhandled 'error' event
      ^

Error [ERR_STREAM_WRITE_AFTER_END]: write after end
    at writeAfterEnd (_http_outgoing.js:694:15)
    at write_ (_http_outgoing.js:706:5)
    at ClientRequest.write (_http_outgoing.js:687:15)
    at Proxy._write (/home/sylvain/Projects/getpro/t.js:40:27)
    at doWrite (internal/streams/writable.js:377:12)
    at clearBuffer (internal/streams/writable.js:529:7)
    at onwrite (internal/streams/writable.js:430:7)
    at callback (internal/streams/writable.js:513:21)
    at afterWrite (internal/streams/writable.js:466:5)
    at onwrite (internal/streams/writable.js:446:7)
Emitted 'error' event on ClientRequest instance at:
    at writeAfterEndNT (_http_outgoing.js:753:7)
    at processTicksAndRejections (internal/process/task_queues.js:83:21) {
  code: 'ERR_STREAM_WRITE_AFTER_END'
}

While investigating this issue, I noticed I can fix it for file streams by forwarding the proxy's _write to the original object's _write method (and not write without underscore as showed in the code above).

Unfortunately, the HTTP request object does not have a _write method. So I wonder if I can proxy it that way.

CodePudding user response:

MyStream will push data as fast as possible into its internal buffer, and emit an end event when it's done. This in turn calls Proxy.end(), as explained here:

By default, stream.end() is called on the destination Writable stream when the source Readable stream emits 'end', so that the destination is no longer writable.

However, the writable will still receive calls to _write() because the readable's buffer may not have been drained yet. Because you already ended the HTTP request in end(), you get a message that you are trying to write to an ended writable.

Overriding writable._final() is probably a better way to end the HTTP request:

class Proxy extends Writable {
  …
  _final(callback) {
    this._node_req.end(callback);
  }
}

CodePudding user response:

The problem is that the pipe(res) is triggered on each data event. By the time your second data event occurs, the res stream is already closed. You have to override the end method. This code works:

const http = require("http");
const fs = require("fs");
const { Writable, Readable } = require('stream');

class MyStream extends Readable {
  constructor() {
    super();

    this._iterator = this.generator();
  }

 // __proto__;

  *generator() {
    for(const c of "Hello")
      yield c;
  }

  _read(n) {
    const it = this._iterator.next();
    if (it.done)
      this.push(null);
    else
      this.push(it.value);
  }
}

class Proxy extends Writable {

  constructor(req) {
    super();
    
    this._node_req = req;
  }

 // __proto__;

  end(chunk, encoding, cb) {
    //console.trace("end");
    //this._node_req.end(chunk, encoding, cb);
    return {}; // return something
  }

  _write(chunk, encoding, cb) {
    console.log("_write", chunk.toString("utf8"));
    this.prototype = new Writable();
    this.prototype.constructor = Writable;
    return this._node_req.write(chunk, encoding, cb);
  }

  _final(callback) {
    this._node_req.end(callback);
  }
}

const src = new MyStream()
src.pipe(new Proxy(fs.createWriteStream("out.tmp")));
  • Related