Home > Net >  RXJS, Is there a way to wait inside an operator for a subscription to end before continuing the work
RXJS, Is there a way to wait inside an operator for a subscription to end before continuing the work

Time:01-10

So, i am new to RXJS, and i have checked a lot of stackoverflow and documentation before coming here and asking this, but i'm finding a hard time to make my logic work.

I have an Observable that will fetch a collection of documents and return them, and i use the pipe operator to make some changes, like using the map operator to change the object. So far, everything is fine.

The problem is here. Afterward, i need to run an "http request" for every document, in order to get specific data about them ("tags"). The http request is of course made as an Observable too, that needs to get subscribed on to fetch the data. However, the subscription takes some time, and the resulting object afterward doesn't have the required data.

let myFunction.pipe(
      // mapping to add missing data needed for the front-end
      map((results) => ({
        ...results,
        documents: results._embedded.documents.map((document) => ({
          ...document,
          tags: []
        })),
      })),
// mapping to loop through each document, and use the observable to get the tags with the document id
      map((results) => {

        let documents = results.documents.map((document: Document) => {
          // get Tags for each document
          let tagsToReturn = []
          this.getDocumentTags(document.id)
            .pipe(
       // map function to return only the ids for each document, and not the complete tag object
              map((tagsArray) => {
                const modifiedTagsArray = tagsArray.map((tagObject: any) => {
                  if (tagObject !== undefined) {
                    return tagObject.id
                  }
                })
                return modifiedTagsArray
              })
            )
              // the subscription to "actually" get the tags
            .subscribe((tagsArray: number[]) => {
              // Here the tags are found, but the latter code is executed first
              // document.tags = tagsArray
              tagsToReturn = tagsArray
            })

          // console.log(JSON.stringify(document))
          // Here the tags are not found yet
          console.log(JSON.stringify(tagsToReturn))

          return { ...document, tags: tagsToReturn }
        })

       // I then, normally return the new documents with the tags for each document, but it is empty because the subscribe didn't return yet.
        return {
          _links: results._links,
          page: results.page,
          documents: documents,
        }
      }),
      map((results) => {
        results.documents.forEach((doc) => {
          return this.addObservablesToDocument(doc)
        })
        return results
      })
    )

I have tried some solutions with switchmap, forkjoin, concat...etc but it didn't work, or i didn't find the correct way to use them. This is why i'm asking if there is a way to stop or another way to handle this problem.

I have tried using different operators like: mergemap, concat, switchmap to swich to the new request, but afterward, i can't have the global object.

I mostly tried to replicate/readapt this in some ways

CodePudding user response:

By using mergemap combined with forkjoin, i was able to replicate what you were looking for.

Not really sure of how i can explain this, because i'm also not an expert coming to Rxjs, but i used the code from : this stackoverflow answer that i adapted

How i understand it is that, when using mergeMap in the pipe flow, you make sur that everything that get returned there, will be executed by the calling "subscribe()",then the mergeMap returns a forkJoin which is an observable for each document tags

I hope this can help

.pipe(
      // mapping to add missing data needed for the front-end
      map((results) => ({
        ...results,
        documents: results._embedded.documents.map((document) => ({
          ...document,
          tags: []
        })),
      })),

  /********  Added Code *********/
      mergeMap((result: ResultsNew<Document>) => {
        let allTags = result._embedded.documents.map((document) =>
          this.getDocumentTags(document.id).pipe(
            map((tagsArray) => tagsArray.map((tagObject: any) => tagObject.id))
          )
        )
        return forkJoin(...allTags).pipe(
          map((idDataArray) => {
            result._embedded.documents.forEach((eachDocument, index) => {
              eachDocument.tags = idDataArray[index]
            })
            
            return {
              page: result.page,
              _links: result._links,
              documents: result._embedded.documents,
            }
          })
        )
      }),
        /********  Added Code *********/

      map((results) => {
        results.documents.forEach((doc) => {
          return this.addObservablesToDocument(doc)
        })
        return results
      })
    )
  • Related