In the code below I tried to build an RxJS pipe and process values emitted by a subject using pipe
. The code works but the result is different from what I expected to get.
const rxjs = require("rxjs");
var subject = new rxjs.Subject();
subject
.pipe(rxjs.tap(console.log('tap')))
.subscribe(console.log);
function sendRequest(requestID, cb) {
var delay = Math.floor(Math.random() * (6 - 3 1)) 5;
var request = {
hostname: "httpbin.org",
headers: { "request-id": requestID },
path: "/delay/." delay,
};
require("http")
.get(request, (res) => {
var body = "";
res.on("data", function (chunk) { body = chunk; });
res.on("end", function () { cb(res, body); });
})
.end();
}
concurrentRequests(5);
function concurrentRequests(limit) {
for (var i = 0; i < limit; i ) {
sendRequest(i, function (res, body) {
var reqID = JSON.parse(body).headers["Request-Id"];
subject.next("Concurrent Response #" reqID " returned a " res.statusCode);
});
}
}
My purpose is to build a pipe to process each emitted value using Subject
from the RxJS library and when I ran the code above I expected to get this output:
tap
Concurrent Response #1 returned a 200
tap
Concurrent Response #0 returned a 200
tap
Concurrent Response #2 returned a 200
tap
Concurrent Response #3 returned a 200
tap
Concurrent Response #4 returned a 200
But the output was:
tap
Concurrent Response #1 returned a 200
Concurrent Response #0 returned a 200
Concurrent Response #2 returned a 200
Concurrent Response #3 returned a 200
Concurrent Response #4 returned a 200
Why are the emitted values are not going through the pipe?
CodePudding user response:
In
subject.pipe(rxjs.tap(console.log('tap')))
you provided a value instead of a callback:
subject.pipe(rxjs.tap(() => console.log('tap')))
You executed console.log('tab')
and passed undefined
value to rxjs.tap
, while in fact you want to add an observer.