Home > Mobile >  angular handle multiple dependent subscriptions
angular handle multiple dependent subscriptions

Time:09-28

Any help?

  let notificationsMessages = []
  countries.forEach((country: any) => {
      this.isActiveCountry(country.isActive).subscribe((data) => { // // CALL #1 TO API
        country.serverId = data.serverId;
        this.uploadPhotoToApi(country.fileSource).subscribe((response) => { // CALL #2 TO API
          // server return the uploaded file ID
          country.serverFileID = response.serverFileId;
          this.sendCountryToApi(country).subscribe((response) => { // CALL #3 TO API
            this.countriesTable.delete(country.id).then(() => {
              // Delete the uploaded country from local database
              // if is the last country EMIT EVENT
            }, (error) => {
              // if is the last country EMIT EVENT
              notificationsMessages.push(error); // Error to delete country from indexedDB 
            });
          }, (error) => {
              // if is the last country EMIT EVENT
              notificationsMessages.push(error); // // Error to upload country to API
          });
        }, (errorCode) => {
              // if is the last country EMIT EVENT
          notificationsMessages.push(error); // Error on sending file to API
        });
      }, (error) => {
              // if is the last country EMIT EVENT
            notificationsMessages.push(error); // // Error on country identification
      });
  });
  

How can I emit an event when all the "country" list are processed ? And I need to know how many countries were uploaded with success and how many not

For example, if I have a list of 50 countries, when the last one is processed I want to emit an event with 2 arrays ... something like this: Succes: [countryId1, countryId2...] Errors: ['Country Id 2 failed on upload', 'Country Id 10 failed on file upload']

All those 3 calls are dependent and must be executed in that order... and I cannot change this flow Should I emit the event on CALL #3 success and also on all the errors functions? Thanks!

CodePudding user response:

Try avoiding the temptation of nesting many .subscribe(s inside of each other. As @praveen-soni mentioned, switchMap can help with that.

Then to get the status when all countries have been processed, I think forkJoin is perfect for that: It takes in a list of observables, and will emit once all of them complete.

How to build the list of observables? You initially have a list of countries, so you can just map each country to the observable that processes that country. We can also use one single catchError so that the error doesn't shut down the whole stream, but only the one for that specific country.

I think it would look something like:

const result$ = forkJoin(
  countries.map((country) =>
    this.isActiveCountry(country.isActive).pipe(
      switchMap((data) => {
        country.serverId = data.serverId;
        return this.uploadPhotoToApi(country.fileSource);
      }),
      switchMap((response) => {
        country.serverFileId = response.serverFileId;
        return this.sendCountryToApi(country);
      }),
      switchMap(() => {
        return this.countriesTable.delete(country.id);
      }),
      map(() => {
        // Everything went OK, map it to an OK status
        return {
          type: "success",
        };
      }),
      catchError((error) => {
        // If any step fails, map it to an error type
        return of({
          type: "error",
          error,
        });
      }),
      take(1) // Make sure the observable completes
    )
  )
);

// result$ can now be used as any other observable
result$.subscribe(result => {
  console.log(result);
})

CodePudding user response:

Here's one way to do this. This might be overkill as it gives you a lot of granular control over error handling, but then basically always handles errors the same way.

Even so, this might be easier to expand on than the most straightforward solution.

Here:

interface TaggedCountry{
  success: boolean,
  country: any,
  error?: any
}

class ArbiratryClassName {

  processCountry(country: any): Observable<TaggedCountry>{

    return this.isActiveCountry(country.isActive).pipe(
      // country now has serverId set
      map(({serverId}) => ({...country, serverId})),
      catchError(error => throwError(() => ({
        success: false,
        country,
        error
      }) as TaggedCountry)),

      mergeMap((resultCountry: any) => this.uploadPhotoToApi(resultCountry.fileSource).pipe(
        // country now has serverFileId set
        map(({serverFileId}) => ({...resultCountry, serverFileId})),
        catchError(error => throwError(() => ({
          success: false,
          country: resultCountry,
          error
        }) as TaggedCountry))
      )),

      mergeMap((resultCountry: any) => this.sendCountryToApi(resultCountry).pipe(
        // Ignore response from sendCountryToApi
        mapTo(resultCountry),
        catchError(error => throwError(() => ({
          success: false,
          country: resultCountry,
          error
        }) as TaggedCountry))
      )),

      mergeMap((resultCountry: any) => from(this.countriesTable.delete(resultCountry.id)).pipe(
        // Ignore response from countriesTable.delete
        mapTo(resultCountry),
        catchError(error => throwError(() => ({
          success: false,
          country: resultCountry,
          error
        }) as TaggedCountry))
      )),

      map((resultCountry: any) => ({
        success: true,
        country: resultCountry
      }) as TaggedCountry),

      // Convert errors into regular emissions
      catchError((tagged:TaggedCountry) => of(tagged))
    );
  }

  processCountries(countries: any[]): Observable<{success: TaggedCountry[], errors: TaggedCountry[]}>{
    return forkJoin(countries.map(c => this.processCountry(c))).pipe(
      map((tagged: TaggedCountry[]) => ({
        success: tagged.filter(tag => tag.success),
        errors: tagged.filter(tag => !tag.success)
      }))
    )
  }

  doSomethingWith(countries: any[]): void {
    this.processCountries(countries).subscribe({
      next: countries => console.log("All countries processed. Result: ", countries),
      complete: () => console.log("There's only one emission, so this should get called immediately after .next() was called"),
      error: err => console.log("This is a surprise, is there an error we didn't catch earlier? Error: ", err)
    })
  }
}

In case seeing the same thing done differently is helpful, here's a shorter implementation of processCountry

processCountry(country: any): Observable<TaggedCountry>{

  return this.isActiveCountry(country.isActive).pipe(
    tap((res:any) => country.serverId = res.serverId),

    switchMap(_ => this.uploadPhotoToApi(country.fileSource)),
    tap((res:any) => country.serverFileId = res.serverFileId),

    switchMap(_ => this.sendCountryToApi(country)),
    switchMap(_ => this.countriesTable.delete(country.id)),

    // Tag our result as a success
    map(_ => ({
      success: true,
      country
    }) as TaggedCountry),

    // Tag our errors and convert errors into regular emissions
    catchError(error => of(({
      success: false,
      country,
      error
    }) as TaggedCountry))
  );
}
  • Related