I've made a simplified example of the code I am working.
import * as rx from "rxjs";
import * as op from "rxjs/operators";
async function main(): Promise<void> {
const blocker = new rx.ReplaySubject<0>();
const subscription = rx.timer(0, 1000)
.pipe(
op.take(3),
op.observeOn(rx.queueScheduler),
op.subscribeOn(rx.queueScheduler)
)
.subscribe({
next: x => console.log(`timer: next: [${x}]`),
error: err => console.log(`timer: error: [${err}]`),
complete: () => {
console.log("timer: complete");
blocker.next(0);
}
});
const promise = rx.lastValueFrom(blocker.asObservable()
.pipe(
op.single(),
op.observeOn(rx.queueScheduler),
op.subscribeOn(rx.queueScheduler)
));
console.log("prepared to await");
await promise;
console.log("awaited!");
subscription.unsubscribe();
}
main()
.then(
() => console.log("all right"),
reason => console.log(`rejected: [${reason}]`))
.catch(err => console.log(`error! : ${err}`))
.finally(() => console.log("done done done"));
It works (sorta) except the part when "awaited!" is never printed to console, as well as any of the lines after the promise returned by main
function.
The actual console output is:
prepared to await
timer: next: [0]
timer: next: [1]
timer: next: [2]
timer: complete
And the one I expect is:
prepared to await
timer: next: [0]
timer: next: [1]
timer: next: [2]
timer: complete
awaited!
all right
Questions:
- Why is that happening? What is the nodejs 'magic' (with scheduler, I assume) involved here? Could you recommend any books on nodejs internal involved here?
- How to change the code to achieve expected output?
Thanks.
CodePudding user response:
I don't think this is about schedulers or nodejs. lastValueFrom
requires that the observable it is handed completes before it resolves the returned promise. In this case, it never completes.
One Solution
Try this:
blocker.next(0);
blocker.complete();
You should then get your expected output. Otherwise blocker never completes, which means it never emits a last value.
Another Solution
Replace
single()
with
take(1)
Then even though blocker itself doesn't complete, the observable you hand to lastValueFrom
will complete. Which will resolve your promise.
Yet Another Solution
Try this:
blocker.next(0);
blocker.next(1);
You should then get your expected output. Though this time it will be single
that resolves your promise with a SequenceError
(instead of the previous examples in which the promise resolves with the value 0
).