Home > Net >  Unsure how to reset stream based on previous values
Unsure how to reset stream based on previous values

Time:01-26

I'm working on the following RxJs stream within an Angular app and I'm running into trouble resetting the values. I have multiple places in my application which emit values to these three observables within the combineLatest call here, such as when a user changes a filter setting, or updates the page via an input field. Additionally I have a lazy load function that ticks the page forward when the user gets near the bottom.

When name and filter update I want it to only return the most recent data from getContent, but when the page observable has a new value I want it to combine the previous data with the current one via the scan operator. The only problem I have is that I can't seem to figure out the best way to do this within scan because at that point it doesn't know what the current and previous values were within mergeMap from name and filter.

getContent(name, page filter) {
    return this.http
      .get(
        `${this.API_BASE}/${name}/${filter}/${page}`
)

The stream looks like the following:

this.results$ = combineLatest(
  this.dataService.getName(),
  this.dataService.getPage(),
  this.dataService.getFilter()
).pipe(
  mergeMap(([name, page, filter]) => {
     this.dataService.getContent(name, filter, page);
  }),
  scan(
    (
      acc,
      curr
    ) => {
      this.nextPage = curr[curr.length - 1].id;
      if (acc.length && curr.length) {
         return acc.concat(curr);
      }

      return acc;
    },
    []
  )
);

The template is just a div that gets looped over and updated with the async pipe, which I'd like to keep if possible. Is there a better way to handle this within a single stream, or a way to break this apart in a way where I can get it to do what I need it to?

CodePudding user response:

You can simply use combineLatest on your "criteria", then feed that to a switchMap (provides reset whenever criteria changes), then feed the criteria result to the observable of the getPage(). At that point, you have all 3 arguments needed to call getContent().

Something like this should work for you:

  results$ = combineLatest([
    this.dataService.getName(),
    this.dataService.getFilter(),
  ]).pipe(
    switchMap(([name, filter]) => this.dataService.getPage().pipe(
      startWith(undefined), // initially emit `undefined` becuase there's not a "next page" cursor for the first call.
      mergeMap(page => this.dataService.getContent(name, filter, page)),
      scan((acc, curr) => {
        this.nextPage = curr[curr.length - 1].id;
  
        if (curr.length) {
          return acc.concat(curr);
        }
  
        return acc;
      }, [])
    )),
  );

It's best if the observable stream doesn't rely on outside variables. (this.nextPage)

We can get around this by changing the shape we emit. Instead of emitting only the "results" we can also have it include the "nextPage" information as well:

  query$ = combineLatest([
    this.dataService.getName(),
    this.dataService.getFilter(),
  ]).pipe(
    switchMap(([name, filter]) => this.dataService.getPage().pipe(
      startWith(undefined),
      mergeMap(page => this.dataService.getContent(name, filter, page)),
      scan(
        (acc, curr) => ({ 
          results  : curr.length ? acc.results.concat(curr) : acc.results, 
          nextPage : curr[curr.length - 1]?.id
        }),
        { results: [] as Result[], nextPage: undefined })
    )),
  );
<ng-container *ngIf="query$ | async as query">

  <div *ngFor="let result of query.results">
    {{ result.label }}
  </div>

  <button *ngIf="query.nextPage" (click)="loadMore(query.nextPage)"> 
    Load More 
  </button>

</ng-container>

Here's a working StackBlitz demo.


Two last notes:

  1. If the getPage(), getFilter(), & getName() methods don't take any params, you can simply declare them as observables, rather than methods that return observables: page$, filter$, name$

  2. The data service is exposing multiple observables, so the component can subscribe to them and feed the emissions back to the service's own getContent() method. Since your data service maintains all of the observable sources, it would probably be simpler to declare the content as an observable in the service instead of in the component.

Check out this StackBlitz if you want to :-)

CodePudding user response:

If I understand the problem right, you may try something along these lines

// start creating a stream that is the result of combineLatest, but share it,
// so that any other stream that uses this stream as its upstream will
// use the same shared upstream
// name, page and filter notifications are enriched with a value that says
// whether scan has to be used or not downstream
this.streamsShared$ = combineLatest(
  this.dataService.getName().pipe(res => ({res: res, scan: false})),
  this.dataService.getPage().pipe(res => ({res: res, scan: true})),
  this.dataService.getFilter().pipe(res => ({res: res, scan: false}))
).pipe(
  share()
)

// then you create a stream for the cases where scan has not to be used
this.noScan$ = this.streamsShared$.pipe(
  filter(val => !val.scan),
  // if no scan has to be used, then you just return the value returned 
  // by getContent
  // I used concatMap rather than mergeMap because usually it is the best
  // operator to use with chains of http calls (but this is another story)
  concatMap(([name, page, filter]) => {
     this.dataService.getContent(name, filter, page);
  }),
  // then you just return the value received by getContent in a one-value array
  map(retVal => [retVal])
)

// then you create a stream for the cases where scan has to be used
this.useScan$ = this.streamsShared$.pipe(
  filter(val => !val.scan),
  concatMap(([name, page, filter]) => {
     this.dataService.getContent(name, filter, page);
  }),
  scan(
    (
      acc,
      curr
    ) => {
      this.nextPage = curr[curr.length - 1].id;
      if (acc.length && curr.length) {
         return acc.concat(curr);
      }

      return acc;
    },
    []
  )
)

// eventually you merge the 2 observables to get the final stream which
// has to be passed to the async pipe
this.results$ = merge(this.noScan$, this.useScan$)

Honestly this whole machinery seems a bit complicated to me, but what you are actually embedding in the final stream is a non banal state: the state of the last name, last filter and last page as well as the state of the chain of results of getContent. All this state is contained and managed by the stream without you having to keep it in instance variables.

  • Related