Home > Mobile >  ConcatMap is not executed in Rxjs
ConcatMap is not executed in Rxjs

Time:04-27

I am writing a file upload component. At first I check if a file exists and if so I prompt a user to either choose to save a copy or overwrite the existing file. After the existence check I want to upload the file, but if the user chooses to save as a copy the existence check has to be done again in a recursive manner. This means that it could and should happen that multiple observables are chained and executed one after another.

But here is the bug: The observable I return in the expression-function is not executed, although it is chained by the use of the concatMap pipe.

Why is this not behaving the way I intended it to behave?

The code looks like this:

  onDocumentsUploaded(documents: CustomDocument[]) {
    console.log(documents)
    documents = documents.map(document => {
      return {
        ...document,
        fullName: document.name   "."   document.extension
      }
    });

    for (let document of documents) {
      this.sub.add(
        this.handleUpload(document).subscribe()
      );
    }
  }

  private handleUpload(document: CustomDocument): Observable<CustomDocument[]> {
    return this.documentService.checkIfExists(document).pipe(
      concatMap(exists => this.expression(document, exists))
    );
  }

  private expression(document: CustomDocument, exists: boolean): Observable<CustomDocument[]> {
    let subject$ = new Subject<CustomDocument[]>();
    if (exists) {
      this.confirmationService.confirm({
        header: this.translate.instant("documentUpload.confirmation.header"),
        message: this.translate.instant("documentUpload.confirmation.message", { fileName: "'"   document.fullName   "'" }),
        icon: 'pi pi-info-circle',
        accept: () => {
          subject$.pipe(
            concatMap(_ => this.handleUpload({
              ...document,
              name: document.name   " (Copy)",
              fullName: document.name   "(Copy)"   "."   document.extension
            }).pipe(
              tap(x => console.log("handleUpload"))
            ))
          );
        },
        reject: () => {
          subject$.pipe(
            concatMap(_ => this.documentService.upload(document))
          );
        }
      });
      return subject$.asObservable();
    }
    else {
      return this.documentService.upload(document);
    }
  }

CodePudding user response:

In the function passed to accept, you are pipeing the subject$ though the concatMap, but you are ignoring the result observable.

Note that calling the .pipe() does not modify the original observable (or subject) in any way, but rather creates a new observable with the operators applied.

I think that what you'd like to do here is to return the result of the subject$.pipe(...) instead of the subject$.asObservable().

CodePudding user response:

Based on your comment that the confirmationService.confirm is thread blocking, the use of the Subject in this case in unnecessary.

You can do something like this instead.

private handleUpload(document: CustomDocument): Observable<CustomDocument> {
  return this.documentService.checkIfExists(document).pipe(
    concatMap((exists) => 
      exists
      ? this.confirmAction(document) 
      : this.documentService.upload(document)
    )
  );
}

private confirmAction(document: CustomDocument): Observable<CustomDocument> {
  //variable to save the 'result' of the confirm method      
  let action$: Observable<CustomDocument>; 

  this.confirmationService.confirm({
    header: this.translate.instant("documentUpload.confirmation.header"),
    message: this.translate.instant("documentUpload.confirmation.message", { fileName: "'"   document.fullName   "'" }),
    icon: 'pi pi-info-circle',
    accept: () => {
      action$ = this.handleUpload({
        ...document,
        name: document.name   " (Copy)",
        fullName: document.name   "(Copy)"   "."   document.extension
      })
    },
    reject: () => {
      action$ = this.documentService.upload(document))
    }
  }  
    
  // since the confirm() is thread-blocking, the following return statement
  // won't be executed until the confirm method completes.  
  return action$; 
}

Btw the Subject example wasn`t working for two main reasons.

  1. As @kszksz pointed out in his comment, subject$.pipe() does not modify the original subject$, it creates a new observable. So you were returning an incorrect source.
  2. In order to emit a notification in a Subject you need to call to its method next with the value. (ie. subject$.next(value))

But as I pointed out at the beginning in this case its use is not required and just overcomplicate the code.

Cheers

  • Related