This is the websocket received message event handler. However these subscriptions are awaited and they are currently not running in parallel, so there is that process time prompt message. The idea is to make these data handlers run in parallel, which means Task.WhenAll
should be used. The data handler is of type Func<T, ValueTask>
, so it makes sense to be executed in parallel, so that if one of the data handlers contains await Task.Delay(5000)
, it shouldn't block the others.
private async ValueTask OnDataReceived(DataReceivedEventArgs e)
{
var timestamp = DateTimeOffset.Now;
var messageEvent = new MessageEvent(e.Message, timestamp);
foreach (var subscription in _subscriptions
.GetAll()
.Where(subscription => MessageMatchesHandler(messageEvent.Data, subscription.Request)))
{
var userProcessTime = await MeasureUserProcessTime(async () => await subscription.DataHandler(messageEvent));
if (userProcessTime.TotalMilliseconds > 500)
{
_logger.LogTrace("Detected slow data handler ({UserProcessTimeMs} ms user code), consider offloading data handling to another thread. Data from this socket may arrive late or not at all if message processing is continuously slow.",
userProcessTime.TotalMilliseconds);
}
}
}
What about the version below?
private async ValueTask OnDataReceived(DataReceivedEventArgs e)
{
var timestamp = DateTimeOffset.Now;
var messageEvent = new MessageEvent(e.Message, timestamp);
var handlers = _subscriptions
.GetAll()
.Where(subscription => MessageMatchesHandler(messageEvent.Data, subscription.Request))
.Select(subscription => subscription.DataHandler(messageEvent).AsTask());
_ = Task.WhenAll(handlers);
}
or
private async ValueTask OnDataReceived(DataReceivedEventArgs e)
{
var timestamp = DateTimeOffset.Now;
var messageEvent = new MessageEvent(e.Message, timestamp);
var handlers = _subscriptions
.GetAll()
.Where(subscription => MessageMatchesHandler(messageEvent.Data, subscription.Request))
.Select(subscription => Task.Run(() => subscription.DataHandler(messageEvent)));
await Task.WhenAll(handlers);
}
CodePudding user response:
The version with await Task.WhenAll
is fine. I assume you still want to keep MeasureUserProcessTime
, though, since that will give you useful warnings for long handlers.
CodePudding user response:
I suppose somewhere you have some synchronization mechanism with SemaphoreSlim entities, but anyway... why don't you simply fire and forget if you don't do anything with the results of your tasks?
private void OnDataReceived(DataReceivedEventArgs e)
{
if (e is null)
{
_logger.Warning("!!!!!!!");
return;
}
var timestamp = DateTimeOffset.Now;
var messageEvent = new MessageEvent(e.Message, timestamp);
foreach (var subscription in _subscriptions
.GetAll()
.Where(subscription => MessageMatchesHandler(messageEvent.Data, subscription.Request)))
{
Task.Run(() =>
{
try
{
var handlers = _subscriptions
.GetAll()
.Where(subscription => MessageMatchesHandler(messageEvent.Data, subscription.Request))
.Select(subscription => subscription.DataHandler(messageEvent).AsTask());
var userProcessTime = await MeasureUserProcessTime(async () => await subscription.DataHandler(messageEvent));
if (userProcessTime.TotalMilliseconds > 500)
{
_logger.LogTrace("Detected slow data handler ({UserProcessTimeMs} ms user code), consider offloading data handling to another thread. Data from this socket may arrive late or not at all if message processing is continuously slow.",
userProcessTime.TotalMilliseconds);
}
}
catch (Exception ex)
{
}
});
}
}