Home > database >  RxJs mergeAll(), scan() coninutes to accumulate
RxJs mergeAll(), scan() coninutes to accumulate

Time:02-11

I have this bit of code that is intended to accumulate features as they are added ot the features$$ observable.

  private features$$: BehaviorSubject<Observable<Feature<FeatureConfig>>> = new BehaviorSubject<
    Observable<Feature<FeatureConfig>>
  >(of());
  
  private features$: Observable<Feature<FeatureConfig>[]> = this.features$$.pipe(
    mergeAll(),
    scan((acc: Feature<FeatureConfig>[], filter: Feature<FeatureConfig>) => {
      acc.push(filter);
      return acc;
    }, [] as Feature<FeatureConfig>[])
  );

The object itself is not important, but they are added with an add function:

  add(feature: FeatureConfig): void {
    this.features$$.next(
      of({
        key: feature.friendlyId,
        value: feature
      })
    );
  }

and then publicly exposed via a an observable that is intended to filture the current features by a value:

  feature$ = (feature: string): Observable<FeatureConfig | null> => {
    return this.features$.pipe(
      map((features: FeatureConfig[]) => {
        return (
          features.find((featureConfig: FeatureConfig) => {
            return featureConfig.key === feature;
          })?.value ?? null
        );
      })
    );
  };

This seems to work fine except every time something calls the feature$(feature: string) it hits the acc.push(filter) line in the private features$ observable scan operator. Perhaps I misunderstood the pattern or how mergeAll/scan behave or I just am missing something obvious? My thought was it should just return the current value of whatever has been pushed to features$$ at any given time?

CodePudding user response:

Add the shareReplay(1) operator after the scan operator where features$ is defined:

private features$: Observable<Feature<FeatureConfig>[]> = this.features$$.pipe(
  mergeAll(),
  scan((acc: Feature<FeatureConfig>[], filter: Feature<FeatureConfig>) => {
    acc.push(filter);
    return acc;
  }, [] as Feature<FeatureConfig>[]),
  shareReplay(1)
);

This will prevent the Observable chain before the shareReplay operator to re-run every time there is a new subscription.

  • Related