Home > database >  Typescript RXJS Subject await async susbscriptions
Typescript RXJS Subject await async susbscriptions

Time:12-09

Suppose I have two completely independent pieces of code in two completely unrelated classes that subscribe to the same Observable in a service class.

class MyService {
  private readonly subject = new Subject<any>();

  public observe(): Observable<any> {
    return this.subject.pipe();
  }
}

class A {
  constructor(private readonly service: MyService) {
    service.observe().subscribe( async (value) => { 
      await this.awaitAnOperation(value);
      console.log('Class A subscription complete', value);
    });
  }
}

class B {
  constructor(private readonly service: MyService) {
    service.observe().subscribe( (value) => console.log('Class B subscription complete', value));
  }
}

The issue that I am now facing is that when the service emits an event, the log of class B will come before A, even though A subscribed first. What I need is that all methods are ran and finished before going to the next one. I know if A were to be synchronously than my question would be solved, but A does need to run an async operation AND Bcan only log after A has logged.

A and B are completely unaware of eachother and should be as well. In e.g. C# we can run an async method synchrnonously by using GetAwaiter().Wait(); and is not considered a bad practice since when it needs to run on the main thread. An equivalent TS/JS option would be nice.

EDIT A subscribes before B. It is simply the chronological order of subscribing that should also execute. I know this is by default emitted in that sequence, but the fact remains that running a subscription method on a different thread would continue the main thread to the next subscription. This is what I need to avoid somehow.

CodePudding user response:

You can do that by combining the two Observables using switchMap rxjs operator. it will be guarantied that the second Observable B will not started unless the first one A is done.

Here a good example to this scenario in the section addressed "Combining Observables in series": https://blog.danieleghidoli.it/2016/10/22/http-rxjs-observables-angular/

CodePudding user response:

I had a similar issue that I solved with an operator I called forkConcat. Instead of subscribing multiple times, I made multiple operators and chained them so that source$.pipe(operatorA) would happen and complete before source$.pipe(operatorB) started, and that would complete before source$.pipe(operatorC) started, and all three completed before dealing with the next value from source$.

My code looked like this...

source$.pipe(
  forkConcat(
    concat,
    operatorA,
    operatorB,
    operatorC
) )

where forkConcat is defined as

import { merge, Observable, of } from 'rxjs';
import { concatMap, Operator } from 'rxjs/operators';

/*
Creates an operator that will fork several operators from the same input, and not proceed until all those things are done.

First Argument:
  If those things should be done in turn, send concat as the first argument.
  If each operator should be done in parallel, use merge (or undefined) as the first argument.
  To return an array of each operators' final values per value received by forkConcat, use forkJoin.
  You could also use combineLatest, etc.

All other arguments are operators.
*/

type Combiner<T> = (...array$: ObservableInput<T>[]) => Observable<T>;

export function forkConcat<T,R>(combiner: Combiner<T> = merge, ...arrayOperators: Operator<T, R>[]) {
  return concatMap<T,R>((x) => {
    try {
      const x$ = of(x);
      const o = arrayOperators
      .filter(op => !!op) // ignore falsy arguments
      .map(op => x$.pipe(op));
      return o.length ? combiner(...o) : x$;
    } catch (e) {
      throw new ForkConcatError(x, e, combiner, arrayOperators);
    }
  });
}

class ForkConcatError<T> extends Error {
  constructor(
    readonly receivedValue: T,
    readonly innerError: Error,
    readonly combiner: Combiner<T>,
    readonly arrayOperators: Operator<T, R>[]
  ) {
    super(innerError.message);
  }
}

It worked. But I've also got to tell you...

I threw it away

I slowly began to realize that the need for forkConcat was a sign that I should be doing things differently. I haven't seen your code but when you say they shouldn't know about each other yet one affects the other, I highly suspect you should consider the same. If, instead of global/shared variables, you had global/shared subjects and observables where the one that emitted to B was...

source$.pipe(
  concatMap(x => concat(
    of(x).pipe(
      operatorA,
      ignoreElwments()
    ),
    of(x) // B should receive the same value A did.
  )
)

... then you wouldn't have this issue and your code would be cleaner.

In my case, I went a different route. I made a State interface, and then instead of passing source values through my operators, I passed {source: Source, state: State} objects. This way, there was no longer any global variables at all! Every operator could be a pure function (or pure function plus side effects) using only the combined value/state pairs emitted into them. State errors were much harder to make! To use this tactic, start thinking of your A and B as operators (that don't know about each other) instead of subscribers (that don't know about each other) and your future self will likely be happier.

But just in case I'm wrong about that advice, you now have forkConcat.

  • Related