I have incoming objects of the same type, but if An Object property IsThrottlable
is set to false regardless of the ID I DON'T want to throttle it but if IsThrottlable
is set to true I would like to throttle the object every 3 seconds by ID. So if an object with the same ID comes in 50 times with 3 seconds I would like to send the HTTPSend for the last Object.
namespace BoatThrottle
{
class MData
{
public int ID { get; set; }
public bool IsThrottlable { get; set; }
public string Description { get; set; }
}
class Program
{
static void Main(string[] args)
{
Random rand = new Random();
while (true)
{
var data = GenerateRandomObj(rand);
SendData(data);
Task.Delay(rand.Next(100, 2000));
}
}
static MData GenerateRandomObj(Random rand)
{
return new MData() { ID = rand.Next(1, 20), Description = "Notification....", IsThrottlable = (rand.Next(2) == 1) };
}
static void SendData(MData mData)
{
if (mData.IsThrottlable)
{
_doValues.OnNext(mData);
var dd = ThrottledById(DoValues);
var observable =
dd
.Throttle(TimeSpan.FromMilliseconds(3000.0))
.ObserveOn(Scheduler.ThreadPool.DisableOptimizations());
_subscription =
observable
.ObserveOn(Scheduler.ThreadPool.DisableOptimizations())
.Subscribe(y =>
{
HTTPSend(y);
});
}
else
{
// MData object coming in IsThrottlable set to false always send this data NO throttling
HTTPSend(mData);
}
}
private static IDisposable? _subscription = null;
public static IObservable<MData> ThrottledById(IObservable<MData> observable)
{
return observable.Buffer(TimeSpan.FromSeconds(3))
.SelectMany(x =>
x.GroupBy(y => y.ID)
.Select(y => y.Last()));
}
private static readonly Subject<MData> _doValues = new Subject<MData>();
public static IObservable<MData> DoValues { get { return _doValues; } }
static void HTTPSend(MData mData)
{
Console.WriteLine("===============HTTP===>> " mData.ID " " mData.Description " " mData.IsThrottlable);
}
}
}
EDIT:
e.g ALL received within 3 seconds
MData ID = 1, IsThrottlable = False, Description = "Notify"
MData ID = 2, IsThrottlable = True, Description = "Notify1"
MData ID = 2, IsThrottlable = True, Description = "Notify2"
MData ID = 9, IsThrottlable = False, Description = "Notify2"
MData ID = 2, IsThrottlable = True, Description = "Notify3"
MData ID = 2, IsThrottlable = True, Description = "Notify4"
MData ID = 3, IsThrottlable = True, Description = "Notify"
MData ID = 4, IsThrottlable = True, Description = "Notify"
MData ID = 5, IsThrottlable = True, Description = "Notify1"
MData ID = 5, IsThrottlable = True, Description = "Notify2"
MData ID = 8, IsThrottlable = True, Description = "Notify1"
MData ID = 8, IsThrottlable = True, Description = "Notify2"
MData ID = 8, IsThrottlable = True, Description = "Notify3"
MData ID = 8, IsThrottlable = True, Description = "Notify4"
MData ID = 8, IsThrottlable = True, Description = "Notify5"
MData ID = 8, IsThrottlable = True, Description = "Notify6"
Expected at the First 3 seconds:
- MData ID = 1, IsThrottlable = False, Description = "Notify"
- MData ID = 9, IsThrottlable = False, Description = "Notify2"
- MData ID = 2, IsThrottlable = True, Description = "Notify4"
- MData ID = 3, IsThrottlable = True, Description = "Notify"
- MData ID = 4, IsThrottlable = True, Description = "Notify"
- MData ID = 5, IsThrottlable = True, Description = "Notify2"
- MData ID = 8, IsThrottlable = True, Description = "Notify6"
Final Implementation:
public MainWindow()
{
InitializeComponent();
Random rand = new Random();
Debug.Print("========================");
while (true)
{
var data = GenerateRandomObj(rand);
SendData(data);
Task.Delay(rand.Next(100, 2000));
}
}
static MData GenerateRandomObj(Random rand)
{
return new MData() { ID = rand.Next(1, 3), Description = "Notification....", IsThrottlable = rand.Next(2) ==1 };
}
static void SendData(MData mData)
{
_doValues.OnNext(mData);
IObservable<MData> throttled = DoValues
.GroupBy(x => x.IsThrottlable)
.SelectMany(g1 =>
{
if (!g1.Key) return g1; // Not throttleable, return it as is.
return g1
.GroupBy(x => x.ID)
.SelectMany(g2 => g2.Throttle(TimeSpan.FromSeconds(3)));
});
_subscription =
throttled
.Subscribe(y =>
{
HTTPSend(y);
});
}
private static IDisposable? _subscription = null;
private static readonly Subject<MData> _doValues = new Subject<MData>();
private static IObservable<MData> DoValues { get { return _doValues; } }
static void HTTPSend(MData mData)
{
Debug.Print("===============HTTP===>> " mData?.ID " " mData?.Description " " mData?.IsThrottlable " " DateTime.Now);
}
CodePudding user response:
One way to do it is to group the sequence by the IsThrottlable
property. This way you'll get a nested sequence that contains two subsequences, one containing the throttleable elements and one containing the non-throttleable elements. You can then transform each of the two subsequences accordingly, and finally use the SelectMany
operator to flatten the nested sequence back to a flat sequence that contains the elements emitted by the two transformed subsequences.
The subsequence that contains the non-throttleable elements needs no transformation, so you can return it as is.
The subsequence that contains the throttleable elements needs to be grouped further by the ID
property, producing even thinner subsequences that contain only throttleable elements having the same id. These are the sequences that need to be throttled:
IObservable<MData> throttled = source
.GroupBy(x => x.IsThrottlable)
.SelectMany(g1 =>
{
if (!g1.Key) return g1; // Not throttleable, return it as is.
return g1
.GroupBy(x => x.ID)
.SelectMany(g2 => g2.Throttle(TimeSpan.FromSeconds(3)));
});
At the end you'll get a flat sequence that contains both the throttleable and the non-throttleable items, with the throttleable items already throttled by id.
The SelectMany
operator is essentially a combination of the Select
Merge
operators.