Home > front end >  Avoid subscribing in Angular Service using RxJS?
Avoid subscribing in Angular Service using RxJS?

Time:11-30

I am working on an angular project that needs to load up pages and then display them one by one / two by two.

As per this article and some other sources, subscribing in services is almost never necessary. So is there a way to rewrite this in pure reactive style using RxJS operators?

Here's what I have (simplified) :

export class NavigationService {

  private pages: Page[] = [];
  private mode = Mode.SinglePage;

  private index = 0;
  private currentPages = new BehaviorSubject<Page[]>([]);
  
  constructor(
    private pageService: PageService,
    private view: ViewService,
  ) {
    this.pageService.pages$.subscribe(pages => {
      this.setPages(pages);
    });

    this.view.mode$.subscribe(mode => {
      this.setMode(mode);
    });
  }

  private setPages(pages: Page[]) {
    this.pages = pages;
    this.updateCurrentPages();
  }

  private setMode(mode: Mode) {
    this.mode = mode;
    this.updateCurrentPages();
  }

  private updateCurrentPages() {
    // get an array of current pages depending on pages array, mode & index
    this.currentPages.next(...);
  }

  public goToNextPage() {
    this.index  = 1;
    this.updateCurrentPages();
  }

  public get currentPages$() {
    return this.currentPages.asObservable();
  }
}

I've tried multiple solutions and didn't manage to get it right. The closest I got was using scan(), but it always reset my accumulated value when the outer observables (pages, mode) got updated.

Any help is appreciated, thanks !

CodePudding user response:

Let's first detail what you are doing:

  • You subscribe to pageService.pages$ and view.mode$
  • Those subscriptions take the values and put them in a private variabkle
  • Then fire a function to use those two variables
  • Finally, trigger a value push.

All this can be done in a simple pipeline. You'd need to include the index as an observable (behaviour subject in our case) to react to that change too.

Use combineLatest, this will subscribe to all observables we want, and trigger the pipe WHEN ALL have fired once, and every time one changes afterwards. You may want to use .pipe(startWith("something")) on observables that should have a default value so your observable pipe triggers asap.

CombineLatest will then provide an object as value, with each value in the object key passed when created. Here pages, mode and index. I've used a switchMap to demo here if updateCurrentPages passes an observable, but you could use a map if there is no async task to be done.

 export class NavigationService {

  private index = new BehaviorSubject<number>(0);
  readonly currentPages$:Observable<Pages[]>;
  
  constructor(
    private pageService: PageService,
    private view: ViewService,
  ) {
     this.currentPages$ = combineLatest(
           {pages:this.pageService.pages$,mode:this.view.mode$,index:this.index$}
        ).pipe(
        switchMap(({pages,mode})=>{
           return this.updateCurrentPages(pages,mode);
         }),
     );
  }

  
  private updateCurrentPages() {
    // get an array of current pages depending on pages array, mode & index
    this.currentPages.next(...);
  }

  public goToNextPage() {
    this.index.next(this.index.value  );
  }

}

Beware tough about stating that subscribe is never needed. You may want to subscribe to some events for some reason. It is just that combining everything in a pipeline makes things easier to handle... In the example above, the observables will be unsubscribed to when the consumer of your service unsubscribes to currentPages$. So one thing less to handle.

Also, note that if multiple consumers subscribe to this service's currentPages$ the pipeline will be duplicated and unecessary work will be done, once for each subscriber. While this MAY be good, you might want to have everyone subscribe to the same "final" observable. This is easily do-able by adding share() or shareReplay(1) at the end of your pipeline. Share will make sure the same observable pipeline will be used for the new subscriber, and they will receive new values starting from then. Using shareReplay(1), will do the same but also emit the latest value directly on subscribe (just like BehaviourSubject) the 1 as parameter is indeed the number of replays to send out...

Hope this helps! When you master the RxJS you'll see that things will get easier and easier (see the difference in code amount!) but getting the hang of it take a little bit of time. Do not worry, just perseverate you'll get there. (Hint to get better, using outside variables/properties are the evil of handling pipelines)

CodePudding user response:

You can use merge to create reducer functions from observables. These functions will update part of a state maintained by the service. They are past along to the scan operator which will update the prior state from the reducer. After the reducer is run, currentPages is set on the new state and that new state is returned.

export class NavigationService {

  private readonly relativePageChangeSubject = new Subject<number>();

  readonly state$ = merge(
    this.pageService.pages$.pipe(map((pages) => (vm) => ({ ...vm, pages }))),
    this.relativePageChangeSubject.pipe(map((rel) => (vm) => ({ ...vm, index: vm.index   rel }))),
    this.view.mode$.pipe(map((mode) => (vm) => ({ ...vm, mode })))
  ).pipe(
    startWith((s) => s), // if necessary, force an initial value to be emitted from the initial value in scan.
    scan((s, reducer) => {
      const next = reducer(s);
      // update currentPages on the next state here.
      return next;
    }, { currentPages: [], index: 0, mode: Mode.SinglePage, pages: [] }),
    shareReplay(1)
  )

  readonly currentPages$ = this.state$.pipe(
    map(x => x.currentPages),
    distinctUntilChanged()
  );

  constructor(private pageService: PageService, private view: ViewService) { }

  goToNextPage() {
    this.relativePageChangeSubject.next(1);
  }
}

Notes:

  • Instead of having a nextPage Subject, a more flexible relative change subject is used that will modify the index from the value in the prior state.
  • The currentPage$ observable isn't necessary, as a consumer could just attach to the main state$ and map as needed. Feel free to make state$ private or remove currentPage$.
  • Related