Home > OS >  Rxjs - Collects all unprocessed values
Rxjs - Collects all unprocessed values

Time:10-26

I want to use Subject has an async task queue with concurrency = 1.

Gathering new tasks is faster than performaning those tasks so, as a performance optimization, I would like to process all un-processed tasks until now.

The following code goes over task by task:

this.tasks
      .pipe(
        mergeMap(async task => {
          await this.processTasks([task])
        }, 1),
      )

I would like to covert the above code to something similar to:

this.tasks
      .pipe(
        mergeMap___AllUnprocessedUntilNow(async tasks => {
          await this.processTasks(tasks)
        }, 1),
      )
  • I can't use bufferTime or bufferCount because they will introduce additiol latency to every new task.

Actual Run Result:

* adding task 1 
* procssing task 1 (takes alot of time to perform it)
* meanwhile, adding more tasks: 2,3,4
* processed task 1
* procssing task 2
* processed task 2
* procssing task 3
* processed task 3
* procssing task 4
* processed task 4

Expected Run Result:

* adding task 1 
* procssing task 1 (takes alot of time to perform it)
* meanwhile, adding more tasks: 2,3,4
* processed task 1
* procssing task 2,3,4
* processed task 2,3,4

CodePudding user response:

I have a custom operator that does this. Heads up, there are a few bits that make this more complicated than you might first think. On top of that, this is a bit over-engineered but it sounds like you can use this with the defaults to get what you want.

Custom RxJS Operator: bufferedExhaustMap

function bufferedExhaustMap<T, R>(
  project: (v: T[]) => ObservableInput<R>,
  minBufferLength = 0,
  minBufferCount = 1,
  concurrent = 1
): OperatorFunction<T, R> {

  function diffCounter(){
    const incOrDec = new Subject<boolean>();
    return {
      onAvailable: incOrDec.pipe(
        scan((acc, curr) => (curr ?   acc : --acc), 0),
        startWith(0),
        shareReplay(1),
        first(count => count < concurrent),
        mapTo(true)
      ),
      start: () => incOrDec.next(true),
      end: () => incOrDec.next(false)
    };
  }

  return source => defer(() => {
    const projectCount = diffCounter();

    const shared = source.pipe(share());

    const nextBufferTime = () => forkJoin([
      shared.pipe(take(minBufferCount), delay(0)),
      timer(minBufferLength),
      projectCount.onAvailable
    ]);

    return shared.pipe(
      bufferWhen(nextBufferTime),
      delayWhen(() => projectCount.onAvailable),
      tap(projectCount.start),
      map(project),
      mergeMap(projected => from(projected).pipe(
        finalize(projectCount.end))
      )
    );
  });
}

Usage

Note, I've removed some superfluous promise/RxJS mingling as it's a code smell.

this.tasks.pipe(
  mergeMap___AllUnprocessedUntilNow(async tasks => {
    await this.processTasks(tasks)
  }, 1),
)

becomes

this.tasks.pipe(
  mergeMap___AllUnprocessedUntilNow(
    tasks => this.processTasks(tasks)
    , 1
  )
)

(remember that all high-order RxJS operators will turn iterables and promises into observables for you. Otherwise you can use the from operator instead).

Since bufferedExhaustMap has a default concurrency of 1, you can write this with the new operator as follows:

this.tasks.pipe(
  bufferedExhaustMap(
    tasks => this.processTasks(tasks)
  )
)

CodePudding user response:

You can accomplish this with a combination of buffer and concatMap, along with a little help from filter and tap.

The idea is to trigger the release of the buffer at the correct time. In your case there are two occasions to release:

  • When the first emission is received
  • After work has been completed
const release$ = new Subject<void>();
let releaseOnNextEmit = true;

const work$ = item$.pipe(
  tap(() => { 
    if(releaseOnNextEmit) setTimeout(() => release$.next(), 0);
  }),
  buffer(release$),
  tap(tasks => releaseOnNextEmit = tasks.length === 0),
  filter(tasks => tasks.length > 0),
  concatMap(tasks => processTasks(tasks)),
  tap(() => release$.next())
);

You can see we use a Subject to trigger the release of the buffer and a releaseOnNextEmit flag to indicate whether receiving an emission should trigger the buffer's release.

After the buffer emits, we set the flag accordingly:

  • true when buffer is empty
  • false when it is NOT empty

filter is used to prevent passing along empty arrays to concatMap (buffer will emit empty array if the buffer is empty when its trigger emits).

We use concatMap to execute the real work, then simply release the buffer afterwards.

Here's a working StackBlitz demo.


With all the taps in there, this isn't super easy to follow, so it may be beneficial to put it into a custom operator:

function bufferConcatMap<T, R>(project: (a: T[]) => ObservableInput<R>) {
  const release$ = new Subject<void>();
  let releaseOnNextEmit = true;

  return (source$: Observable<T>) => source$.pipe(
    tap(() => { 
      if(releaseOnNextEmit) setTimeout(() => release$.next(), 0);
    }),
    buffer(release$),
    tap(tasks => releaseOnNextEmit = tasks.length === 0),
    filter(tasks => tasks.length > 0),
    concatMap(project),
    tap(() => release$.next())
  );
}
const work$ = item$.pipe(
  bufferConcatMap(tasks => processTasks(tasks))
);

Here's another StackBlitz.

  • Related