I am trying to create something like an event loop on RxJs, I use firstValueFrom
as a gate to wait for all events to be processed before going further.
The goal is to have a nodejs service running and reacting on various events, processing those and be able to gracefully shut down when commanded.
I can see the behaviour I cannot explain to myself - when the exit condition could be met - everything works as expected: events are issued by issuers and handled by handlers.
However, when I remove the possibility of the exit event appearing - the code immediately exit after rx.firstValueFrom
call.
The code:
import * as rx from "rxjs";
import * as op from "rxjs/operators";
async function foo(): Promise<string> {
console.log("1");
const s = new rx.ReplaySubject<string>();
const t = rx.timer(1000)
.pipe(
op.take(3),
op.map(x => x.toString()),
op.endWith("exit"),
);
const exitObserver = s.asObservable()
.pipe(
op.mergeWith(t),
op.filter(x => x === "exit")
);
console.log("2");
const firstValue = await rx.firstValueFrom(exitObserver);
console.log("3");
return firstValue;
}
foo()
.then(x => console.log(`result: ${x}`))
.catch(e => console.error(e))
.finally(() => console.log('finally'))
Output:
1
2
3
result: exit
finally
Which is expected.
Altered code with expected indefinite loop ("exit" event is commented out):
import * as rx from "rxjs";
import * as op from "rxjs/operators";
async function foo(): Promise<string> {
console.log("1");
const s = new rx.ReplaySubject<string>();
const t = rx.timer(1000)
.pipe(
op.take(3),
op.map(x => x.toString()),
//op.endWith("exit"),
);
const exitObserver = s.asObservable()
.pipe(
op.mergeWith(t),
op.filter(x => x === "exit")
);
console.log("2");
const firstValue = await rx.firstValueFrom(exitObserver);
console.log("3");
return firstValue;
}
foo()
.then(x => console.log(`result: ${x}`))
.catch(e => console.error(e))
.finally(() => console.log('finally'))
Output:
1
2
Which is not expected. I expect this code to work indefinitely waiting for the "exit event". There are no error messages. I use typescript 4.3.5, node v14.15.4, RxJs 7.4.0.
My questions are:
- Why does the altered code doesn't go into indefinite loop waiting for non-existing message?
- How to create an indefinite loop with RxJs?
CodePudding user response:
- Why does the altered code doesn't go into indefinite loop waiting for non-existing message?
It does. That's why
console.log("3");
return firstValue;
never gets run. You never log a result, etc.
How to create an indefinite loop with RxJs?
Not sure what you mean. The following code will print out a number every 5 seconds for forever. Is that what you mean by indefinite loop
?
interval(5000).subscribe(console.log);
CodePudding user response:
It cannot write 3 to the output, because it is still waiting for first value from exitObservable. You have a filter there, therefore it never happen. The term of indefinite loop is in the RxJS world probably misleading.
You can use
takeUntil
to achieve your goal.
const {Subject} = rxjs;
const {filter, takeUntil} = rxjs.operators;
const actions$ = new Subject();
actions$
.pipe(
filter(action => action !== 'exit'),
takeUntil(actions$.pipe(
filter(action => action === 'exit')
))
)
.subscribe({
next: action => console.log(`result: ${action}`),
error: error => console.error(e),
complete: () => console.log('exit'),
});
actions$.next('first');
actions$.next('second');
actions$.next('exit');
<script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>
<iframe name="sif1" sandbox="allow-forms allow-modals allow-scripts" frameborder="0"></iframe>