Home > Blockchain >  rxjs firstValueFrom never resolve
rxjs firstValueFrom never resolve

Time:10-30

I am trying to create something like an event loop on RxJs, I use firstValueFrom as a gate to wait for all events to be processed before going further. The goal is to have a nodejs service running and reacting on various events, processing those and be able to gracefully shut down when commanded.

I can see the behaviour I cannot explain to myself - when the exit condition could be met - everything works as expected: events are issued by issuers and handled by handlers.

However, when I remove the possibility of the exit event appearing - the code immediately exit after rx.firstValueFrom call.

The code:

import * as rx from "rxjs";
import * as op from "rxjs/operators";

async function foo(): Promise<string> {
    console.log("1");
    const s = new rx.ReplaySubject<string>();
    const t = rx.timer(1000)
        .pipe(
            op.take(3),
            op.map(x => x.toString()),
            op.endWith("exit"),
        );

    const exitObserver = s.asObservable()
        .pipe(
            op.mergeWith(t),
            op.filter(x => x === "exit")
        );
    console.log("2");
    const firstValue = await rx.firstValueFrom(exitObserver);
    console.log("3");
    return firstValue;
}

foo()
    .then(x => console.log(`result: ${x}`))
    .catch(e => console.error(e))
    .finally(() => console.log('finally'))

Output:

1
2
3
result: exit
finally

Which is expected.

Altered code with expected indefinite loop ("exit" event is commented out):

import * as rx from "rxjs";
import * as op from "rxjs/operators";

async function foo(): Promise<string> {
    console.log("1");
    const s = new rx.ReplaySubject<string>();
    const t = rx.timer(1000)
        .pipe(
            op.take(3),
            op.map(x => x.toString()),
            //op.endWith("exit"),
        );

    const exitObserver = s.asObservable()
        .pipe(
            op.mergeWith(t),
            op.filter(x => x === "exit")
        );
    console.log("2");
    const firstValue = await rx.firstValueFrom(exitObserver);
    console.log("3");
    return firstValue;
}

foo()
    .then(x => console.log(`result: ${x}`))
    .catch(e => console.error(e))
    .finally(() => console.log('finally'))

Output:

1
2

Which is not expected. I expect this code to work indefinitely waiting for the "exit event". There are no error messages. I use typescript 4.3.5, node v14.15.4, RxJs 7.4.0.

My questions are:

  1. Why does the altered code doesn't go into indefinite loop waiting for non-existing message?
  2. How to create an indefinite loop with RxJs?

CodePudding user response:

  1. Why does the altered code doesn't go into indefinite loop waiting for non-existing message?

It does. That's why

console.log("3");
return firstValue;

never gets run. You never log a result, etc.


How to create an indefinite loop with RxJs?

Not sure what you mean. The following code will print out a number every 5 seconds for forever. Is that what you mean by indefinite loop?

interval(5000).subscribe(console.log);

CodePudding user response:

  1. It cannot write 3 to the output, because it is still waiting for first value from exitObservable. You have a filter there, therefore it never happen. The term of indefinite loop is in the RxJS world probably misleading.

  2. You can use takeUntil to achieve your goal.

const {Subject} = rxjs;
const {filter, takeUntil} = rxjs.operators;

const actions$ = new Subject();

actions$
  .pipe(
    filter(action => action !== 'exit'),
    takeUntil(actions$.pipe(
      filter(action => action === 'exit')
    ))
  )
  .subscribe({
    next: action => console.log(`result: ${action}`),
    error: error => console.error(e),
    complete: () => console.log('exit'),
  });
  
actions$.next('first');
actions$.next('second');
actions$.next('exit');
<script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>
<iframe name="sif1" sandbox="allow-forms allow-modals allow-scripts" frameborder="0"></iframe>

  • Related