Home > Software design >  How do you chain dependent subscriptions that need multiple previous observables
How do you chain dependent subscriptions that need multiple previous observables

Time:07-28

New to rxjs, looking for a good clean way to chain some observables/subscriptions.

Here's some simple pseudocode I made to try to emulate what I'm trying to do.

class Baker implements OnInit {

  //functions, constructor, varibles, etc.

  ngOnInit(): void {
    this.myFridgeService.getDough().pipe(
      switchMap(dough => {
        return of(this.addToBowl(dough));
      }),
      switchMap(bowl => {
        this.myPantryService.getMnMs()
          .subscribe((mnms: Candy[]) => {
            this.bagOfMnMs = mnms; // will use later
            return of(mnms).pipe(filter(mnm => mnm.isGreen()));
          }); 
      }),
      switchMap(greenOnes => {
        return this.myMixerService.mix(bowl, greenOnes); // can I get bowl here somehow?
      }))
      .subscribe(cookie => {
        this.bake(cookie);
      },
        (error: HttpErrorResponse) => {
          console.error(error);
          this.serviceError = error;
        }
      );
  }
}

I can't get bowl into myMixerService because it's from a previous switchMap, and as far as I know I can't pass along multiple returns.

I could put the myMixerService call into the subscription event of myPantryService, which would look something like this

class Baker implements OnInit {
  ngOnInit(): void {
    this.myFridgeService.getDough().pipe(
      switchMap(dough => {
        return of(this.addToBowl(dough));
      }),
      switchMap(bowl => {
        this.myPantryService.getMnMs()
          .subscribe((mnms: Candy[]) => {
            this.bagOfMnMs = mnms; // will use later
            const greenOnes = of(mnms).pipe(filter(mnm => mnm.isGreen()));
            return this.myMixerService.mix(bowl, greenOnes);
          },
            (error: HttpErrorResponse) => {
              console.error(error);
              this.serviceError = error;
            });
      })
    ).subscribe(cookie => {
      this.bake(cookie);
    },
      (error: HttpErrorResponse) => {
        console.error(error);
        this.serviceError = error;
      }
    );
  }
}

But I feel like putting that service call in a subscription event is an antipattern, and in fact part of the reason for switchMap in the first place. And that's besides the fact that it never returns back up to cookie

CodePudding user response:

If you take any observable and you subscribe to it, you get back a subscription. Subscriptions are not observables and cannot be turned into observable. So subscribing is a terminal operation. After you subscribe, you're done (sort of)

In short, subscribing within a switchMap runs counter to what you require.


Here's how I might re-write your psudo-code into something that should compile
(assuming I've read your code properly)

this.myFridgeService.getDough().pipe(

  map(dough => this.addToBowl(dough)),
  switchMap(bowl => 
    this.myPantryService.getMnMs<Candy[]>().pipe(
      tap(mnms => this.bagOfMnMs = mnms), // will use later
      map(mnms => mnms.filter(mnm => mnm.isGreen())),
      map(greenOnes => [bowl, greenOnes])
    )
  ),
  switchMap(([bowl, greenOnes]) => 
    this.myMixerService.mix(bowl, greenOnes)
  )

).subscribe({

  next: cookie => this.bake(cookie),
  error: e =>  {
    console.error(e);
    this.serviceError = e;
  },
  complete: () => console.log("Done Baking cookies")
  
});

CodePudding user response:

You are mixing imperative and reactive styles, which is why you're having trouble. Your code should work without any (this.thing = thing) side-effects in the pipeline. Instead, define your pipelines without state and without side-effects, and then consume them in the template using the async pipe.

For example, don't do this style of code:

@Component({
  changeDetection: ChangeDetectionStrategy.Default,
  template: '<h2>V: {{ initialValue }} v*2: {{ doubledValue }}</h2>'
})
export class MyComponent implements OnInit {

  public initialValue: number;
  public doubledValue: number;

  ngOnInit() {
    this.valueService.getValue().pipe(
      tap(it => (this.initialValue = it)), 
      map(it => it*2)
    ).subscribe(it => (this.doubledValue = it));
  }

})

Instead, do this style:

@Component({
  changeDetection: ChangeDetectionStrategy.OnPush,
  template: '<h2>V: {{ initialValue$ | async }} v*2: {{ doubledValue$ | async }}</h2>'
})
export class MyComponent {

  public readonly initialValue$: Observable<number> = this.valueService.getValue();
  public readonly doubledValue$: Observable<number> = this.initialValue$.pipe(
    map(it => it*2)
  );

})

You'll get tons of benefits:

  • OnPush ChangeDetection, which is much, much, much more efficient
  • Automatic lifecycle management: Observables are subscribed to when the template renders, the template is re-rendered when the values change and only when the values change, and the observables get automatically unsubscribed from when the component is destroyed (no leaks)
  • No more mutable state (in my experience, the #1 cause of bugs)
  • Clean and simple: each value the template gets is the result of a single pipeline, you know exactly where to look for
  • Prevents nasty hacks, for example it's no longer possible for evil people to @ViewChild-inject this component into theirs and then overwrite its values
  • Related