Home > Enterprise >  Unexpected behavior of RxJS operators zip, merge, forkJoin and concat
Unexpected behavior of RxJS operators zip, merge, forkJoin and concat

Time:10-21

In this StackBlitz ToDo application, I'm writing a simple 3 column presentation based on chaining observables.

In the AppComponent, my observables are declared and initialized as follows:

import { Component, OnInit } from '@angular/core';
import { forkJoin, map, merge, Observable, zip } from 'rxjs';
import { DataService } from './data.service';
import { Item } from './item';
import { State } from './state';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css'],
})
export class AppComponent implements OnInit {
  protected allItems: Observable<Item[]>;
  protected pendingItems: Observable<Item[]>;
  protected inProgressItems: Observable<Item[]>;
  protected doneItems: Observable<Item[]>;
  protected cancelledItems: Observable<Item[]>;
  protected terminalItems: Observable<Item[]>;

  constructor(protected _dataSvc: DataService) {
    this.allItems = this._dataSvc.items.asObservable();
    this.pendingItems = this.allItems.pipe(
      map((data) => data.filter((p) => p.status === State.Todo))
    );
    this.inProgressItems = this.allItems.pipe(
      map((data) => data.filter((p) => p.status === State.Doing))
    );
    this.doneItems = this.allItems.pipe(
      map((data) => data.filter((p) => p.status === State.Done))
    );
    this.cancelledItems = this.allItems.pipe(
      map((data) => data.filter((p) => p.status === State.Cancelled))
    );
    // First option
    this.terminalItems = merge(this.doneItems, this.cancelledItems);
    // Second option
    //this.terminalItems = forkJoin([this.doneItems, this.cancelledItems]).pipe(
    //  map(d => d[0].concat(d[1])));
    //);
    // Third option
    //this.terminalItems = concat(this.doneItems, this.cancelledItems);
    // Fourth option
    //this.terminalItems = zip(this.doneItems, this.cancelledItems)
    //  .pipe(map(d => d[0].concat(d[1])));
  }
  
  ngOnInit(): void {
    this._dataSvc.initData();
  }
}

As you can see in the InMemoryDataService, the field allItems gets the following content

let items: Item[] = [
      {id:1, name: "Eat", status: State.Todo},
      {id:2, name: "Sleep", status: State.Todo},
      {id:3, name: "Code", status: State.Todo},
      {id:4, name: "Game", status: State.Doing},
      {id:5, name: "Swim", status: State.Done},
      {id:6, name: "Bike", status: State.Cancelled},
    ];

The RxJS operator decision tree guided me to use merge to initialize the observable terminalItems as follows:

this.terminalItems = merge(this.doneItems, this.cancelledItems);

In app.component.html, I expected the Terminal div, to contain items of status Cancelled and Done (i.e. Bike, Swim).

However, the Terminated column contains only cancelledItems. Changing the order of the arguments of the merge operator makes the doneItems appear, but not the cancelledItems.

Checking the documentation and trying alternatives (forkJoin, concat and zip), I tested how different operators produce terminated items and found the following:

  • forkJoin: Showed no elements in Terminated column. Check Second option commented in the code, which basically tests the solution proposed in this answer
  • concat: Depending on arguments order, shows either cancelledItems or doneItems
  • zip: Showed both cancelledItems and doneItems

Could someone explain why only zip is working as expected? although according to the documentation, merge looks like it must work!

CodePudding user response:

You need to use combineLatest since forkJoin only fires when all observables complete.

    this.terminalItems = combineLatest([
      this.doneItems,
      this.cancelledItems,
    ]).pipe(map(([a, b]) => [...a, ...b]));

Working stackblitz: https://stackblitz.com/edit/angular-ivy-1qguvv?file=src/app/app.component.ts

More info: https://www.learnrxjs.io/learn-rxjs/operators/combination/combinelatest

You might prefer learnrxjs.io to the official documentation, I find it more helpful.

CodePudding user response:

Merge will merge the emissions of n Observables. So you get two Item[] values, every one which will replace the previous and your ui will render the names in that array.

You can see this behaviour when you add a delay(2000) to the doneItems which will make that emission come later and overwriting the render.

To get the combined values you will have to accumulate whichever items are already there with new emissions into one array using combineLatest

  • Related