Home > Software design >  Values emitted by RxJS Subject not going through .pipe(...)
Values emitted by RxJS Subject not going through .pipe(...)

Time:07-31

In the code below I tried to build an RxJS pipe and process values emitted by a subject using pipe. The code works but the result is different from what I expected to get.

const rxjs = require("rxjs");
var subject = new rxjs.Subject();

subject
.pipe(rxjs.tap(console.log('tap')))
.subscribe(console.log);

function sendRequest(requestID, cb) {
  var delay = Math.floor(Math.random() * (6 - 3   1))   5;
  var request = {
    hostname: "httpbin.org",
    headers: { "request-id": requestID },
    path: "/delay/."   delay,
  };
  require("http")
    .get(request, (res) => {
      var body = "";
      res.on("data", function (chunk) { body  = chunk; });
      res.on("end", function () { cb(res, body); });
    })
    .end();
}

concurrentRequests(5);
function concurrentRequests(limit) {
  for (var i = 0; i < limit; i  ) {
    sendRequest(i, function (res, body) {
      var reqID = JSON.parse(body).headers["Request-Id"];
      subject.next("Concurrent Response #"   reqID   " returned a "   res.statusCode);
    });
  }
}

My purpose is to build a pipe to process each emitted value using Subject from the RxJS library and when I ran the code above I expected to get this output:

tap
Concurrent Response #1 returned a 200
tap
Concurrent Response #0 returned a 200
tap
Concurrent Response #2 returned a 200
tap
Concurrent Response #3 returned a 200
tap
Concurrent Response #4 returned a 200

But the output was:

tap
Concurrent Response #1 returned a 200
Concurrent Response #0 returned a 200
Concurrent Response #2 returned a 200
Concurrent Response #3 returned a 200
Concurrent Response #4 returned a 200

Why are the emitted values are not going through the pipe?

CodePudding user response:

In

subject.pipe(rxjs.tap(console.log('tap')))

you provided a value instead of a callback:

subject.pipe(rxjs.tap(() => console.log('tap')))

You executed console.log('tab') and passed undefined value to rxjs.tap, while in fact you want to add an observer.

  • Related