Home > Back-end >  How do I chain together 2 forkJoins so the second call can use the data from the first?
How do I chain together 2 forkJoins so the second call can use the data from the first?

Time:06-06

I need to be able to chain 2 forkJoins together as the second group of observables requires the output of the first set of observables.

The problem is if I do the second one inside a .pipe(map(...)) the observable will return another observable rather than resolve group 1 and then resolve group 2. This means I have to have a subscribe in the subscribe and two separate error handlers like this.

var o = forkJoin(
    a: getObservableA(),
    b: getObservableB(),
    c: getObservableC()
).pipe(
    map( result => {
        a2: processA(result.a),
        b2: processB(result.b),
        c2: processC(result.c)
    }); 
)

o.subscribe( x => {
    x.subscribe( y => { 
        // It worked, process output {a2:..,b2... etc
    }, errorY => {... errors for part 2 })
}, errorX => {... errors for part 1 })

I also tried:

forkJoin(
    a: getObservableA().pipe(map(processA(result.a)),
    b: getObservableB().pipe(map(processB(result.b)),
    c: getObservableC().pipe(map(processC(result.c))
).subscribe( x => ... )

But that gives an object like Observable<{a: Observable, b: Observable, c: Observable}> which means those processX calls aren't being subscribed to and executed.

I don't really need the data from any of the observables, just to fetch the data in group 1 and use it in group 2.

Is there a way to easily chain two forkJoins together which would give one error handler to catch any errors and to avoid nesting subscribes which I've been told is a bad pattern?

CodePudding user response:

Maybe you need a switchMap?

forkJoin([
  getObservableA(), 
  getObservableB(), 
  getObservableC()
]).pipe(switchMap(([r1, r2, r3]) => {
  return forkJoin([
    processA(r1), 
    processB(r2), 
    processB(r3)
  ])
})).subscribe(([r1, r2, r3]) => {
  
})

It would probably be better practice to chain the call to processA/B/C directly after each element inside the first forkJoin:

forkJoin([
  getObservableA().pipe(switchMap(processA)), 
  getObservableB().pipe(switchMap(processB)), 
  getObservableC().pipe(switchMap(processC))
]).subscribe(([r1, r2, r3]) => {

})

This will be faster since we don't have to wait for all the Observables to complete before beginning to proccess them.


You can then put an error handler for all errors inside the pipe() with catchError().

forkJoin([
  getObservableA().pipe(switchMap(processA)), 
  getObservableB().pipe(switchMap(processB)), 
  getObservableC().pipe(switchMap(processC))
]).pipe(catchError(err => {
  
})).subscribe(([r1, r2, r3]) => {

})

Alternatively, put it inside the subscribe() callback.

forkJoin([
  obs1.pipe(switchMap(processA)), 
  obs2.pipe(switchMap(processB)), 
  obs3.pipe(switchMap(processC))
]).subscribe(([r1, r2, r3]) => {

}, error => {
  
})
  • Related