I have the following code that gets called from a Controller.
public async Task Execute()
{
var objects = await _repo.GetObjects(); // This gets 500 items
List<Object1> object1s = new List<Object1>();
List<Object2> object2s = new List<Object2>();
foreach (var object in objects)
{
if(object == "Something")
{
var specialObject = TurnObjectIntoSpecialObject(object);
object2s.Add(specialObject);
}
else
{
var anotherObject = TurnObjectIntoAnotherObject(object);
object1s.Add(anotherObject);
}
}
var list1Async = object1s.Select(async object => await restService.PostObject1(object)); //each call takes 200 -> 2000ms
var list2Async = object2s.Select(async object => await restService.PostObject2(object));//each call takes 300 -> 3000ms
var asyncTasks = list1Async.Concat<Task>(list2Async);
await Task.WhenAll(asyncTasks); //As 500 'tasks'
}
Unfortunately, I'm getting a 504 error after around 300 or so requests. I can't change the API the RestService calls so I'm stuck trying to make the above code more performant.
Changing Task.WhenAll
to a foreach
loop does work, and does resolve the time out but it's very slow.
My question is how can I make sure the above code does not timeout after x number of requests?
CodePudding user response:
Since you are unable to change the rest service the 504 gateway timeout will remain. A better solution would be to use a retry mechanism, if you receive a 504 error code then you'll retry after 'x' seconds.
Why retry after 'x' seconds? The reasons for the 504 could be many, it could be that the server does not handle more request because it is at it's maximum workload at the moment.
A good and battle-tested library for retry mechanism is Polly.
You could also write your own function or action depending on the return type.
Depending on the happyflow use-case other methods could be used, but in this situation I went with the thought of you wanting to upload all the data even if an exception occurs.
Something to think about, if this service is provided by a third party vendor then look into the documentation. It will most likely have a section on max concurrent connections to the service.
CodePudding user response:
Making more concurrent calls to a remote site or database doesn't improve throughput, quite the opposite. Conflicts between the concurrent operations mean that beyond a certain point everything will start taking more time, until the service crashes. Right now you have a possible 300-way blocking problem.
The way all services handle this is by restricting the number of concurrent connections. In fact, many services will throttle clients so they don't crash if someone ... sends 500 concurrent requests. Some may even tell you they're throttling you with a 429 response.
Another way is to use batch requests, so instead of making 300 or 500 calls you can send a batch of 500 operations, allowing the service to handle them in an efficient way.
You can use Parallel.ForEachAsync to execute multiple calls with a specified degree of parallelism :
ParallelOptions parallelOptions = new()
{
MaxDegreeOfParallelism = 30
};
await Parallel.ForEachAsync(object1, parallelOptions, async obj =>
{
await restService.PostObject1(obj);
});
await Parallel.ForEachAsync(object2, parallelOptions, async obj =>
{
await restService.PostObject2(obj);
});
You can adjust the DOP to find what works best without slowing down the remote service.
If the service can handle it, you could start both pipelines concurrently and await both of them to complete:
ParallelOptions parallelOptions = new()
{
MaxDegreeOfParallelism = 30
};
var task1=Parallel.ForEachAsync(object1, parallelOptions, async obj =>
{
await restService.PostObject1(obj);
});
var task2=Parallel.ForEachAsync(object2, parallelOptions, async obj =>
{
await restService.PostObject2(obj);
});
await Task.WhenAll(task1,task2);
It doesn't make sense to retry those operations before you limit the DOP. Retrying 500 failed requests will only lead to another failure. Retrying with a random or staggered delay is essentially the same as limiting the DOP from the start, except it takes far longer to complete.
CodePudding user response:
This answer assumes that you are using .NET 6 or later. You could project the objects
to an enumerable of object
elements, and then parallelize the processing of the projected objects with the Parallel.ForEachAsync
method. This method allows to configure the MaxDegreeOfParallelism
, so that not all projected objects are processed at once. As a result the remote server will not be bombarded with more requests than it can handle, nor the network bandwidth will be saturated. The optimal MaxDegreeOfParallelism
can be found by experimentation. Start with a small number, like 5, and then gradually increase it until you find the sweet spot that offers the best performance.
public async Task Execute()
{
var objects = await _repo.GetObjects();
IEnumerable<object> projected = objects.Select(obj =>
{
if (obj == "Something")
{
return (object)TurnObjectIntoSpecialObject(obj);
}
else
{
return (object)TurnObjectIntoAnotherObject(obj);
}
});
ParallelOptions options = new()
{
MaxDegreeOfParallelism = 5
};
await Parallel.ForEachAsync(projected, options, async (item, ct) =>
{
switch (item)
{
case Object1 obj1: await restService.PostObject1(obj1); break;
case Object2 obj2: await restService.PostObject2(obj2); break;
default: throw new NotImplementedException();
}
});
}
The above code processes the objects in the same order that appear in the objects
sequence. The PostObject1
/PostObject2
operations are parallelized, but the TurnObjectIntoSpecialObject
/TurnObjectIntoAnotherObject
operations are not. If you want to parallelize these too, then you can feed the Parallel.ForEachAsync
with the objects
, and do the projection inside the parallel loop.
In case of errors, only the first exception will be propagated. If you want to propagate all the errors, you can find solutions here.