Home > Back-end >  How do I make Observable await for async callbacks in next() and call them in order of subscription?
How do I make Observable await for async callbacks in next() and call them in order of subscription?

Time:10-28

It seems I should manage to find the answer just by googling around, but I could not, so here I am.

Let's say I have this code:

const something = async (): Promise<void> => {
    return new Promise<void>((resolve, _reject) => {
        setTimeout(resolve, 500);
    });
};

const s = new Subject<number>();
s.subscribe(async (n) => {
    await something(); 
    console.log(n);
});
s.subscribe(async (n) => {
    await something(); 
    console.log(n   1);
});
s.next(1);

is that code guaranteed to always output

1
2

in that exact order? Well, it turns out it will probably output that, because both Promises have a 500ms delay, but there's no guarantee. In fact if I change the code to:

const something = async (): Promise<number> => {
    return new Promise<number>((resolve, _reject) => {
        const waitFor = Math.random() * 1000;
        setTimeout(() => resolve(waitFor), waitFor);
    });
};

const s = new Subject<number>();
s.subscribe(async (n) => {
    const delay = await something(); 
    console.log(n   " / "   delay);
});
s.subscribe(async (n) => {
    const delay = await something(); 
    console.log((n   1)   " / "   delay);
});
s.next(1);

then you can experience the second subscriber sometimes running before the first finishes, based on the random delay each one gets.

Now I need the subscribers to run in the same order they subscribed, but I can't provide blocking callbacks, because my callbacks must be async in order to await for anything.

Is there a way to make Observable itself await for each subscriber before calling the next one?

CodePudding user response:

The actual misconception here is, that you think observables will do something with a return value from a subscribe call, which is not the case.

As you can see in the function source code of observables (subscribe(next: (value: T) => void): Subscription;), this function should be a void function. Defining your function as async basically returns a promise, which is not used by the observable. Therefore the observable will also not wait for any subscription to finish.

It kind of seems like you are trying to misuse the intended information flow for observables (which is strictly from the observable to subscribers) in order to return some information back to the stream. You will not be able to achieve this.

I suggest you to describe the actual use case (like business logic) in a question. You will probably get a hint on which kind of rxjs operators to use in order to achieve the expected result.

  • Related