Home > Blockchain >  Rxjs observable to promise never resolved in nodejs
Rxjs observable to promise never resolved in nodejs

Time:12-16

I've made a simplified example of the code I am working.

import * as rx from "rxjs";
import * as op from "rxjs/operators";

async function main(): Promise<void> {
    const blocker = new rx.ReplaySubject<0>();
    const subscription = rx.timer(0, 1000)
        .pipe(
            op.take(3),
            op.observeOn(rx.queueScheduler),
            op.subscribeOn(rx.queueScheduler)
        )
        .subscribe({
            next: x => console.log(`timer: next: [${x}]`),
            error: err => console.log(`timer: error: [${err}]`),
            complete: () => {
                console.log("timer: complete");
                blocker.next(0);
            }
        });
    const promise = rx.lastValueFrom(blocker.asObservable()
        .pipe(
            op.single(),
            op.observeOn(rx.queueScheduler),
            op.subscribeOn(rx.queueScheduler)
        ));
    console.log("prepared to await");
    await promise;
    console.log("awaited!");
    subscription.unsubscribe();
}

main()
    .then(
        () => console.log("all right"),
        reason => console.log(`rejected: [${reason}]`))
    .catch(err => console.log(`error! : ${err}`))
    .finally(() => console.log("done done done"));

It works (sorta) except the part when "awaited!" is never printed to console, as well as any of the lines after the promise returned by main function.

The actual console output is:

prepared to await
timer: next: [0]
timer: next: [1]
timer: next: [2]
timer: complete

And the one I expect is:

prepared to await
timer: next: [0]
timer: next: [1]
timer: next: [2]
timer: complete
awaited!
all right

Questions:

  1. Why is that happening? What is the nodejs 'magic' (with scheduler, I assume) involved here? Could you recommend any books on nodejs internal involved here?
  2. How to change the code to achieve expected output?

Thanks.

CodePudding user response:

I don't think this is about schedulers or nodejs. lastValueFrom requires that the observable it is handed completes before it resolves the returned promise. In this case, it never completes.

One Solution

Try this:

blocker.next(0);
blocker.complete();

You should then get your expected output. Otherwise blocker never completes, which means it never emits a last value.

Another Solution

Replace

single()

with

take(1)

Then even though blocker itself doesn't complete, the observable you hand to lastValueFrom will complete. Which will resolve your promise.

Yet Another Solution

Try this:

blocker.next(0);
blocker.next(1);

You should then get your expected output. Though this time it will be single that resolves your promise with a SequenceError (instead of the previous examples in which the promise resolves with the value 0).

  • Related