Home > Software engineering >  How can I implement a refresh function for a data stream?
How can I implement a refresh function for a data stream?

Time:08-10

I want to implement a refresh function to update the current list items$ while simultaneously being able to await the refreshItems().

This is my current implementation that works:

private readStream = new Subject<T[]>();
readStream$ = this.readStream.asObservable();

getItems = (): Observable<T[]> => 
  this.get().pipe(tap((res) => this.readStream.next(res)));

items$ = merge(this.getItems(), this.readStream$).pipe(
  distinctUntilChanged(),
  share({ connector: () => new ReplaySubject(1) })
);

refreshItems = () => this.getItems();

As you might have noticed, items$ will start off with calling this.getItems(), but this also triggers the this.readStream$ later on, hence the distinctUntilChanged().

I want to be able to call refreshItems() from another location, and at the same time I want to be able to know when the call completes, which is possible now when I use refreshItems() in a higher-order mapping function. I did it this way becauseitems$ has a ReplaySubject for caching purposes. I'll get the previous value when I listen to items$.

Currently, I'm using merge(this.getItems(), this.readStream$) because I want to get the items immediately when the object gets created, but also want to 'update' it when the getItems() get executed.

Is there a way to implement it without distinctUntilChanged() or make it better suitable for extension and/or cleaner?

Edit: The following explains the reason behind the refreshItems(): I have two classes that contain each other (eg. Class A contains zero or more IDs of Class B and Class B contains an ID of Class A (one-to-many relationship)). When Class A gets created, one or more IDs of Class B gets assigned to it as well. I need a fresh list of Class B when Class A is created to update the UI (only show non-assigned classes). That's why I have to wait until the refreshItems() of Class B is completed. Because refreshItems() gets called when the creation of Class A is completed.

Edit 2: Changing the code of items$ to using skip() works as well:

items$ = merge(this.getItems().pipe(skip(1)), this.readStream$)
    .pipe(share({ connector: () => new ReplaySubject(1) })); 

CodePudding user response:

The question I have is why do you need the observable from calling refreshItems? Is it just so you can control the ui somehow? Whatever the reason, maybe it would be better to look at the problem from a different perspective, and track the state of your items, not just the items themselves. This might be able to cover your required scenarios.

private refreshItemsSubject = new Subject<void>();
/** Reusuable stream repeating the pattern used when updating the state. */
private getState$ = this.get().pipe(
  startWith({ state: 'loading' }),
  map((items) => ({ state: 'ready', items }))
);

readonly itemState$ = merge(
  this.getState$,
  this.refreshItemsSubject.pipe(switchMap(() => this.getState$)
).pipe(
  startWith({ })), // this is so there is an initial item.
  scan((acc, cur) => ({ ...acc, ...cur }), { state: 'ready', items: [] })
  share({ connector: () => new ReplaySubject(1) })
);

refreshItems = () => refreshItemsSubject.next();

In the example above itemState$ returns a stream of items, and the current state they're in. So that before this.get() makes a request a state of loading is emitted.

The emissions from getState$ (which are used to get the initial result and subsequent refresh results) don't emit the full state each time, just a partial. The current state is constructed at the end with the scan operator using object spread syntax. You can alternatively return functions that mutate the state if you desire more flexibility.

CodePudding user response:

Maybe: have one subject triggering the get, and serve the data stream in two flavors, shared (for generic consumer) and "raw" (for callers of refreshItems()).

sharedReadStream$: Observable<T>;
private _readStream$: Observable<T>;
private _refresher$: Subject<void> = new Subject();

constructor() {
    this._readStream$ = this._refresher$.pipe(switchMap(() => this.get()));
    this.sharedReadStream$ = this.readStream$.pipe(share());
    this.refreshItems();
}

refreshItems = () => {
    this._refresher$.next();
    return this._readStream$;
}
  • Related