Home > Net >  how to gracefully combine two Observables inside RXJS pipe
how to gracefully combine two Observables inside RXJS pipe

Time:05-14

I'm building a log viewer with Angular.

When a user enters the page I need to load historical logs and start watching for new ones. User can filter logs with a simple form that emits a query object. Each time the query changes process is restarted (i.e. "old" results are evicted, new history is loaded and new "live" stream is started).

I can do it in two ways but I'm not happy with either.

The first one I find easier to understand but violates DRY principle.

const historicalLogs = this.querySubject
.pipe(
  debounceTime(250),
  tap(() => this.logs = []),
  switchMap(
    query => this.deviceLogsService.getLogsBefore(query, moment())
  )
);

const futureLogs = this.querySubject
.pipe(
  debounceTime(250),
  tap(() => this.logs = []),
  switchMap(
    query => timer(1000, 2000).pipe(mergeMap(t => this.deviceLogsService.getLogsAfter(query, moment())))
  )
);

merge(historicalLogs, futureLogs)
.subscribe(newLogs => {
  this.logs.push(...newLogs);
  this.scrollToVeryBottom();
});

The second doesn't violate DRY but I'm afraid that it will be hard to read/analyze in the future:

this.querySubject
  .pipe(
    debounceTime(250),
    tap(() => this.logs = []),
    switchMap(query => concat([
      this.deviceLogsService.getLogsBefore(query, moment()),
      timer(1000, 2000).pipe(mergeMap(t => this.deviceLogsService.getLogsAfter(query, moment())))
    ]).pipe(mergeAll()))
  )
  .subscribe(newLogs => {
    this.logs.push(...newLogs);
    this.scrollToVeryBottom();
  });

I'll appreciate any suggestions on implementing it in a more readable and elegant way.

CodePudding user response:

One way to simplify, is to create a couple functions that get your future and past logs:


private getHistoricalLogs(query) {
  return this.deviceLogsService.getLogsBefore(query, moment());
}

private pollForFutureLogs(query) {
  return timer(1000, 2000).pipe(
    switchMap(() => this.deviceLogsService.getLogsAfter(query, moment()))
  );
}

this.querySubject.pipe(
    debounceTime(250),
    tap(() => this.logs = []),
    switchMap(query => concat([
      this.getHistoricalLogs(query),
      this.pollForFutureLogs(query)
    ]))
  .subscribe(newLogs => {
    this.logs.push(...newLogs);
    this.scrollToVeryBottom();
  });

Instead of maintaining a separate logs variable "outside the stream", you could simply emit an empty array initially inside your switchMap and accumulate the emissions into a single array using scan:

logs = this.querySubject.pipe(
    debounceTime(250),
    switchMap(query => concat([
      of([]),
      this.getHistoricalLogs(query),
      this.pollForFutureLogs(query)
    ])),
    scan((all, logs) => all.concat(logs), [])
);

logs.subscribe(() => this.scrollToVeryBottom());

CodePudding user response:

Have you tried using combineLatest ?:

obs$.pipe(
  ...,
  switchMap(query => combineLatest([
    this.deviceLogsService.getLogsBefore(query, moment()),
    timer(1000, 2000).pipe(mergeMap(t => this.deviceLogsService.getLogsAfter(query, moment())))
  ])),
  map(([logsBefore, logsAfter]) => {...})
)
  • Related