Home > Mobile >  RxJs observable to listen for 10 seconds, return only 5 of the received values, discard the rest, an
RxJs observable to listen for 10 seconds, return only 5 of the received values, discard the rest, an

Time:02-24

I have an observable that will be taking in multiple real-time trade values (possibly many per second) from a SignalR hub. What I am trying to achieve is an observable that continuously (every 10 seconds) outputs a sample of 5 trades that occurred in those last 10 seconds.

I wrote an observable pipe to try to achieve this by adding all of the received trades into a buffer for 10 seconds, then creating an observable for each of the trades in the buffer array, using 'concatMap' and 'from'. Then, creating another buffer that collects 5 values, and emits them.

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        bufferTime(10000),
        concatMap((tradeArray) => {
            return from(tradeArray);
        }),
        bufferCount(5),
        tap(v => console.log('pipe-end: ', v))
      );

However, the pipe keeps emitting all of the values that it receives in the 10 second window, but in groups of 5. I tried adding a take(5)in the pipe after the concat map, and it works correctly for the first batch of 5 values, but then the observable "completes" and stops listening for new values. I also tried adding a filter with index after the concatMap like this:

filter((v, i) => (i < 6 )),

This works for the first batch of 5 values, but then keeps filtering out every value, so a second buffer of 5 never gets created. Also this use case of the filter appears to be deprecated.

I'm not sure if I'm overlooking something obvious here, but I've looked at many of the rxjs operators and can't find a way to achieve this

CodePudding user response:

What about something like this,

let n = 5;
let t = 10;

//Source, emits a value every second (just a placeholder for real source)

const source = interval(1000);

//Take n=5 values from the source, then end the stream
const takeNValues = source.pipe(take(n));

//Every t=10 seconds switch to a new observable that emits n=5 values and then closes

const takeNValuesEveryTSeconds = interval(t * 1000).pipe(
  switchMap(() => takeNValues)
);

//Subscribe and log n=5 values every t=10 seconds

takeNValuesEveryTSeconds.subscribe(n => 
  console.log('Value => ', n)
);

CodePudding user response:

bufferTime has a maxBufferSize argument that will do this for you.

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        bufferTime(10000, 10000, 5),
        concatMap((tradeArray) => {
            return from(tradeArray);
        }),
        tap(v => console.log('pipe-end: ', v))
      );

You could also use windowTime instead to output each value as soon as it's created, rather than waiting for all 5.

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        windowTime(10000, 10000, 5),
        mergeAll()
        tap(v => console.log('pipe-end: ', v))
      );

These are covered in the documentation for bufferTime and windowTime respectively.

  • Related