Home > Back-end >  Rx.NET - Set capacity and drop oldest
Rx.NET - Set capacity and drop oldest

Time:06-03

System.Threading.Channels allows us to specify a capacity and full mode = DropOldest. Basically when the channel is full and a message is being processed for 10 seconds, during these 10 seconds it will drop the new records.

var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
{
    FullMode = BoundedChannelFullMode.DropOldest
});

Is there a way to do that with Rx?

CodePudding user response:

To add to @Theodor Zoulias's answer, Channels can become "full" because you write into them on one end and read from another end. They are separate operations and the buffer is a defined thing with expected behavior that you have control over.

Observables are supposed to immediately propagate any notifications - there generally is not any holding area. Some Rx operators add one as part of their processing, but if this level of separation between something being read and something being written is important to you, maybe the Channels abstraction is closer to your needs than Observables.

CodePudding user response:

The Rx observables don't have the property of "being full". An observable sequence is not a storage of messages, like a Queue<T> or a Channel<T> is. It's just a generator/propagator of messages. Some Rx operators have internal queues in order to perform their work, like the Concat and the Zip operators for example. Generally these queues are hidden, and cannot be configured to be "lossy".

An Rx component that might have the functionality that you are looking for is the ReplaySubject<T>. This component can be configured with the maximum number of messages that it can replay (int bufferSize), and with the maximum duration that it can store each message before discarding it (TimeSpan window). If you set the bufferSize but not the window, the ReplaySubject<T> will eventually buffer the specified number of items, and then the buffer will retain the same size forever. Each incoming message will cause the oldest buffered message to be dropped. A ReplaySubject<T> is not a consumable queue like the Channel<T>. It is always ready to propagate all the messages in its buffer, to any new subscribers that might come by in the future.

The ReplaySubject<T> is used as propagator by the Replay operator, similarly to how the Publish operator is backed internally by a Subject<T>.

CodePudding user response:

What you’re describing sounds similar to what other Rx implementations (like RxJava) call “back pressure”.

This has not been implemented in Rx.Net, and probably never will be.

  • Related