I have an observable that will be taking in multiple real-time trade values (possibly many per second) from a SignalR hub. What I am trying to achieve is an observable that continuously (every 10 seconds) outputs a sample of 5 trades that occurred in those last 10 seconds.
I wrote an observable pipe to try to achieve this by adding all of the received trades into a buffer for 10 seconds, then creating an observable for each of the trades in the buffer array, using 'concatMap' and 'from'. Then, creating another buffer that collects 5 values, and emits them.
this.bufferedTradeObservable$ = this.tradeReceived
.pipe(
tap(v => console.log('pipe-start: ', v)),
distinct((e: Trade) => e.tradeId),
bufferTime(10000),
concatMap((tradeArray) => {
return from(tradeArray);
}),
bufferCount(5),
tap(v => console.log('pipe-end: ', v))
);
However, the pipe keeps emitting all of the values that it receives in the 10 second window, but in groups of 5. I tried adding a take(5)
in the pipe after the concat map, and it works correctly for the first batch of 5 values, but then the observable "completes" and stops listening for new values. I also tried adding a filter with index after the concatMap like this:
filter((v, i) => (i < 6 )),
This works for the first batch of 5 values, but then keeps filtering out every value, so a second buffer of 5 never gets created. Also this use case of the filter appears to be deprecated.
I'm not sure if I'm overlooking something obvious here, but I've looked at many of the rxjs operators and can't find a way to achieve this
CodePudding user response:
What about something like this,
let n = 5;
let t = 10;
//Source, emits a value every second (just a placeholder for real source)
const source = interval(1000);
//Take n=5 values from the source, then end the stream
const takeNValues = source.pipe(take(n));
//Every t=10 seconds switch to a new observable that emits n=5 values and then closes
const takeNValuesEveryTSeconds = interval(t * 1000).pipe(
switchMap(() => takeNValues)
);
//Subscribe and log n=5 values every t=10 seconds
takeNValuesEveryTSeconds.subscribe(n =>
console.log('Value => ', n)
);
CodePudding user response:
bufferTime
has a maxBufferSize
argument that will do this for you.
this.bufferedTradeObservable$ = this.tradeReceived
.pipe(
tap(v => console.log('pipe-start: ', v)),
distinct((e: Trade) => e.tradeId),
bufferTime(10000, 10000, 5),
concatMap((tradeArray) => {
return from(tradeArray);
}),
tap(v => console.log('pipe-end: ', v))
);
You could also use windowTime
instead to output each value as soon as it's created, rather than waiting for all 5.
this.bufferedTradeObservable$ = this.tradeReceived
.pipe(
tap(v => console.log('pipe-start: ', v)),
distinct((e: Trade) => e.tradeId),
windowTime(10000, 10000, 5),
mergeAll()
tap(v => console.log('pipe-end: ', v))
);
These are covered in the documentation for bufferTime
and windowTime
respectively.