Home > Back-end >  Pass data from first operator in a pipe into two custom operators?
Pass data from first operator in a pipe into two custom operators?

Time:10-02

I recently took over an Angular codebase with no access to the previous dev and am struggling to understand some of the RxJS used.

There is currently a 2 step file upload process that I would like to basically replicate for the upload of streamable video. The upload process makes a request to the API receiving form data for uploading files (and now videos). The current code combines the initial file upload request with the form data for every file in a mergeMap and makes the requests to S3. This works great for the files but I can't seem to make the same happen for videos.

As far as I understand, I need to create a way to pass data from a map operator to two custom operators instead of one. I have tried to use a forkJoin inside a mergeMap to combine the processes but I kept running into problems with passing data from the initial map operator into both functions. Here is my code with the initial upload requests and the custom operator.

  addItem(courseCode: string, unitId: string, formValue: AddItemFormValue) {
    return this.courseApi.addItem(courseCode, unitId, body).pipe(
      map(data => data.data),
      this.makeFileUploadRequests(formValue),
      // Trying to add: this.makeVideoUploadRequests(formValue)
    );
  }

  private makeFileUploadRequests<T extends { files: { name: string; params: ServerModel.FileResponse }[] }>(formValue: {
    files: UploadingFile[];
  }) {
    return mergeMap<T, Observable<ServerModel.FileUploadedResponse[] | T>>(data => {
      return Array.isArray(data.files) && data.files.length > 0 ? this.uploadFiles(data.files, formValue.files) : of(data);
    });
  }

I think part of what makes this confusing is the syntax used for the custom operators. I have created a similar operator for uploading videos but how do I run both of them while making sure the stream coming from the map is passed onto both?

CodePudding user response:

As is so disappointingly often the case, the answer is, "It depends." Right now, addItem is returning an Observable. An Observable is basically a pipe that you can plug into that sometimes emits values. How should values be emitted from the addItem's Observable?

The General Answer

The general question you're asking is, "How do I pass an Observable's value through to 2 different operators?" And the general answer is: You create 2 pipes.

For example:

// Emits a number every second, which is multiplied by 10
const foo$ = interval(1000).pipe(
  map(v => v * 10)
)

const stringified$ = foo$.pipe(
  map(v => v.toString())
)

const percentage$ = foo$.pipe(
  map(v => v / 100)
)

The pipe function simply returns another Observable, so that Observable can be fed into another pipe. In your example, that means you could do this:

addItem(courseCode: string, unitId: string, formValue: AddItemFormValue) {
  const data$ = this.courseApi.addItem(courseCode, unitId, body).pipe(
    map(data => data.data)
  )

  const fileUpload$ = data$.pipe(this.makeFileUploadRequests(formValue))
  const videoUpload$ = data$.pipe(this.makeVideoUploadRequests(formValue))

  return // We're getting to this...
}

The Conditional Part

The function addItem currently returns an Observable that emits some values, and to keep that consistent we need to return a single Observable that emits some values as well. So now we get to the part where it depends, and what it depends on is this: What values should the Observable returned by addItem emit?

There are quite a few options, but here are some of the ones you might consider:

Option 1: The data$ observable should pass info to both makeFileUploadRequests and makeVideoUploadRequests no matter what, and the resulting Observable should emit all values emitted by either operator.

In this case, you're going to want to return a new Observable using the mergeWith function:

return mergeWith(fileUpload$, videoUpload$)

Option 2: The data$ observable should pass info to either makeFileUploadRequests or makeVideoUploadRequests based on some filter, and the resulting Observable should emit only values emitted by whichever one of the two was used.

This is probably the simplest, because it's a straight-up ternary:

return unitId === someConditionOrWhatever ? fileUpload$ : videoUpload$

Option 3: The data$ observable should pass info to both makeFileUploadRequests and makeVideoUploadRequests no matter what, and the resulting Observable should emit only after both of the other Observables have completed.

You're going to want to use forkJoin, which emits when all of the Observables passed to it have completed:

return forkJoin(fileUpload$, videoUpload$)

Option 4: The data$ observable should pass info to both makeFileUploadRequests and makeVideoUploadRequests no matter what, and the resulting Observable should emit only after both of the other Observables have emitted at least once.

Your best friend here is combineLatest:

return combineLatest([fileUpload$, videoUpload$])

There are a couple dozen operators in RxJS dedicated to merging 2 Observables depending on how you want to do it and what the result should be, but hopefully that gives you a good idea.

TL;DR: You don't feed it into 2 different operators in a single pipe, you end that pipe and create 2 new ones that plug into it.

  • Related