I want to use Subject
has an async task queue with concurrency = 1.
Gathering new tasks is faster than performaning those tasks so, as a performance optimization, I would like to process all un-processed tasks until now.
The following code goes over task by task:
this.tasks
.pipe(
mergeMap(async task => {
await this.processTasks([task])
}, 1),
)
I would like to covert the above code to something similar to:
this.tasks
.pipe(
mergeMap___AllUnprocessedUntilNow(async tasks => {
await this.processTasks(tasks)
}, 1),
)
- I can't use
bufferTime
orbufferCount
because they will introduce additiol latency to every new task.
Actual Run Result:
* adding task 1
* procssing task 1 (takes alot of time to perform it)
* meanwhile, adding more tasks: 2,3,4
* processed task 1
* procssing task 2
* processed task 2
* procssing task 3
* processed task 3
* procssing task 4
* processed task 4
Expected Run Result:
* adding task 1
* procssing task 1 (takes alot of time to perform it)
* meanwhile, adding more tasks: 2,3,4
* processed task 1
* procssing task 2,3,4
* processed task 2,3,4
CodePudding user response:
I have a custom operator that does this. Heads up, there are a few bits that make this more complicated than you might first think. On top of that, this is a bit over-engineered but it sounds like you can use this with the defaults to get what you want.
Custom RxJS Operator: bufferedExhaustMap
function bufferedExhaustMap<T, R>(
project: (v: T[]) => ObservableInput<R>,
minBufferLength = 0,
minBufferCount = 1,
concurrent = 1
): OperatorFunction<T, R> {
function diffCounter(){
const incOrDec = new Subject<boolean>();
return {
onAvailable: incOrDec.pipe(
scan((acc, curr) => (curr ? acc : --acc), 0),
startWith(0),
shareReplay(1),
first(count => count < concurrent),
mapTo(true)
),
start: () => incOrDec.next(true),
end: () => incOrDec.next(false)
};
}
return source => defer(() => {
const projectCount = diffCounter();
const shared = source.pipe(share());
const nextBufferTime = () => forkJoin([
shared.pipe(take(minBufferCount), delay(0)),
timer(minBufferLength),
projectCount.onAvailable
]);
return shared.pipe(
bufferWhen(nextBufferTime),
delayWhen(() => projectCount.onAvailable),
tap(projectCount.start),
map(project),
mergeMap(projected => from(projected).pipe(
finalize(projectCount.end))
)
);
});
}
Usage
Note, I've removed some superfluous promise/RxJS mingling as it's a code smell.
this.tasks.pipe(
mergeMap___AllUnprocessedUntilNow(async tasks => {
await this.processTasks(tasks)
}, 1),
)
becomes
this.tasks.pipe(
mergeMap___AllUnprocessedUntilNow(
tasks => this.processTasks(tasks)
, 1
)
)
(remember that all high-order RxJS operators will turn iterables and promises into observables for you. Otherwise you can use the from
operator instead).
Since bufferedExhaustMap
has a default concurrency of 1, you can write this with the new operator as follows:
this.tasks.pipe(
bufferedExhaustMap(
tasks => this.processTasks(tasks)
)
)
CodePudding user response:
You can accomplish this with a combination of buffer
and concatMap
, along with a little help from filter
and tap
.
The idea is to trigger the release of the buffer at the correct time. In your case there are two occasions to release:
- When the first emission is received
- After work has been completed
const release$ = new Subject<void>();
let releaseOnNextEmit = true;
const work$ = item$.pipe(
tap(() => {
if(releaseOnNextEmit) setTimeout(() => release$.next(), 0);
}),
buffer(release$),
tap(tasks => releaseOnNextEmit = tasks.length === 0),
filter(tasks => tasks.length > 0),
concatMap(tasks => processTasks(tasks)),
tap(() => release$.next())
);
You can see we use a Subject to trigger the release of the buffer and a releaseOnNextEmit
flag to indicate whether receiving an emission should trigger the buffer's release.
After the buffer emits, we set the flag accordingly:
true
when buffer is emptyfalse
when it is NOT empty
filter
is used to prevent passing along empty arrays to concatMap
(buffer will emit empty array if the buffer is empty when its trigger emits).
We use concatMap
to execute the real work, then simply release the buffer afterwards.
Here's a working StackBlitz demo.
With all the tap
s in there, this isn't super easy to follow, so it may be beneficial to put it into a custom operator:
function bufferConcatMap<T, R>(project: (a: T[]) => ObservableInput<R>) {
const release$ = new Subject<void>();
let releaseOnNextEmit = true;
return (source$: Observable<T>) => source$.pipe(
tap(() => {
if(releaseOnNextEmit) setTimeout(() => release$.next(), 0);
}),
buffer(release$),
tap(tasks => releaseOnNextEmit = tasks.length === 0),
filter(tasks => tasks.length > 0),
concatMap(project),
tap(() => release$.next())
);
}
const work$ = item$.pipe(
bufferConcatMap(tasks => processTasks(tasks))
);
Here's another StackBlitz.