Home > Enterprise >  Rxjs merge two observables with nested array
Rxjs merge two observables with nested array

Time:01-25

I am using angular *ngFor to display a list of users from an endpoint. Now I want to update the picture of each user after rendering the list. But the API to get the picture gets the picture by user Id. So I have to iterate the list and load the image for each user. I am not sure which RXJS operator I can use to achieve this and also how to update the nested users list array.

Get users list data response - Observable<IUsersList> 1

{
   "message":"Success",
   "data":{
      "users":[
         {
            "id":1,
            "firstname":"Bill",
            "dateAdded":"2022/02/01",
            "picture":"assets/default.png"
         },
         {
            "id":2,
            "firstname":"Steve",
            "dateAdded":"2022/02/01",
            "picture":"assets/default.png"
         }
      ],
      "totalPages":1,
      "totalElements":2,
      "last":true
   }
}

Get picture by id response - Observable<IPicture>

{
   "id":1,
   "picture":"remote.link/bill.png"
}

Here's my attempt, but it's failing. The users observable changes to picture observable when I merge it. So in the subscribe next function there's errors.

this.userService.getUsers(params).pipe(
mergeMap(res => res.data.users.map(user => this.pictureService.getById(user.id)) )
).subscribe({
    next: res => {
      this.list = res.data.users;
      this.isLoading = false;
      this.total = res.data.totalElements;
    },
    error: () => {
      this.isLoading = false;
    },
})

Kindly help.

CodePudding user response:

I would use a mixture of switchMap and forkJoin to merge the results of all your requests:

ngOnInit(): void {

  this.pictureService.getUsers(params)
    .pipe(
      switchMap((userListObj) => {
        // Create a list of observables that will result in completed User objects:
        const obs = userListObj.data.users.map((user) => this.extendUser(user));        
        return forkJoin([of(userListObj), forkJoin(obs)]);
      }),
      // Add the completed User-objects to the IUsersList wrapper:
      map(([userListObj, users]) => ({...userListObj, data: {...userListObj.data, users: users }})),
      tap((res) => {
        this.list = res.data.users;
        this.total = res.data.totalElements;
      }),
      finalize(() => (this.isLoading = false))
    )
    .subscribe();
}

extendUser(user: User) {
  // Assign the updated picture-url, which is fetched via 'getById()'
  return this.pictureService.getById(user.id).pipe(
    map((p) => ({ ...user, picture: p.picture }))
  );
}

CodePudding user response:

If you don't mind a delay from you can do this with switchMap and concatAll or mergeAll. Both concat and merge will execute the inner observables one by one, but concat will preserve the order where merge will run everything at once and not care about order.

this.userService.getUsers(params).pipe(
  map(res => res.data.users.map(user => this.pictureService.getById(user.id).pipe(
     map(img => ({...user, picture: img.picture }))
  ))),
  concatAll(),
  toArray()
)

The problem here is that this could be slow. You could lazy load the images and update the array as images are loaded.

getUsersRequestSubject = new Subject<GetUserParams>();

private getUserData$ = getUsersRequestSubject.pipe(
  switchMap(p => this.userService.getUsers(p)),
  map(users => users.map(x => ({ ...x, picture: '/assets/notfound.png' }))),
  shareReplay()
);

/** Return each image individually. */
private getImage$ = getUserData$.pipe(
  map(users => users.map(user => this.pictureService.getById(user.id))),
  concatAll()
);

users$ = merge(
  this.getUserData$.pipe(map(users => (_) => users)),
  this.getImage$.pipe(map(img => (users) => {
    users.find(x => x.id === img.id)?.picture = img.picture;
    return users;
  }))
).pipe(
  startWith((_) => []), // optional if you want an initial value.
  scan((users, reducer) => reducer(users), [])
);

Here's what's happening:

  • When the user list needs to be updated call geUsersRequestSubject.next(params).
  • Once that happens getUserData$ executes (as long as user$ is subscribed to). This retrieves the users and sets the picture value to a not found image.
  • getImage$ will use the result of getUserData$ to emit picture data one by one to.
  • user$ will use a merge of observables that return reducer functions. The one from getUserData$ will replace the users array, while the one from getImage$ will update the users array with the image.
  • These reducers are executed by the scan function, which will pass the prior state into the reducer function to get the next state.

If you haven't gathered what will happen is that the user list will update with the result of the user api call immediately with each user having a not found image. Then as the api calls to retrieve images execute, each not found image will get replaced.

  • Related