My application hangs (force-killed by Polly's Circuit breaker or timeout) when trying to concurrently receive and deserialize very large JSON files, containing over 12,000,000 chars.
var ct = cancellationToken;
Parallel.For(0, 1000000, (i, state) =>
{
var strategy = GetPollyResilienceStrategy(); // with retry, timeout, and circuit-breaker
await strategy.ExecuteAsync(async () =>
{
var stream = await httpClient.GetStreamAsync(getEndPoint(i), ct);
var foo = await JsonSerializer.DeserializeAsync<Foo>(stream, ct);
process(foo);
});
});
Context
My .NET console application receives a very large JSON file over HTTP and tries to deserialize it by reading some fields necessary for the application and ignoring others. A successful process is expected to run for a few days. Though the program "hangs" after a few hours. After extensive debugging, it turns out an increasing number of threads are stuck trying to deserialize the JSON, and the program hangs (i.e., the Parallel.For
does not start another one) when ~5 threads are stuck. Every time it gets stuck at a different i
, and since JSON objects are very large, it is not feasible to log every received JSON for debugging.
Why does it get stuck? Is there any built-in max capacity in
JsonSerializer
that is reached? e.g., buffer size?Is it possible that
GetStreamAsync
is reading corrupt data, henceJsonSerializer
is stuck in some corner case trying to deserialize a corrupt JSON?
I found this thread relevant, though not sure if there was a resolution other than "fixed in newer version" https://github.com/dotnet/runtime/issues/41604
The program eventually exists but as a result of either the circuit breaker or timeout. I have given very long intervals in the resilience strategy, e.g., giving the process 20min
to try deserializing JSON before retrying.
CodePudding user response:
Even without knowing how did you set up your resiliency strategy it seems like you want to kill two birds with one stone:
- Add resilient behaviour for the http based communication
- Add resilient behaviour for the stream parsing
I would recommend to separate these two.
GetStreamAsync
The GetStreamAsync
call returns a Task<Stream>
which does not allow you to access the underlying HttpResponseMessage
.
But if you issue your request for the stream like this:
var response = await httpClient.GetAsync(url, HttpCompletionOption.ResponseHeadersRead);
using var stream = await response.Content.ReadAsStreamAsync();
then you would be able to decorate the GetAsync
call with a http based Polly policy definition.
DeserializeAsync
Looking to this problem only from resilience perspective it would make sense to use a combination of CancellationTokenSource
s to enforce timeout like this:
CancellationTokenSource userCancellation = ...;
var timeoutSignal = new CancellationTokenSource(TimeSpan.FromMinutes(20));
var combinedCancellation = CancellationTokenSource.CreateLinkedTokenSource(userCancellation.Token, timeoutSignal.Token);
...
var foo = await JsonSerializer.DeserializeAsync<Foo>(stream, combinedCancellation.Token);
But you could achieve the same with optimistic timeout policy.
var foo = await timeoutPolicy.ExecuteAsync(
async token => await JsonSerializer.DeserializeAsync<Foo>(stream, token), ct);
CodePudding user response:
Parallel.For does not play well with async. I would not even expect your example to compile, since the lamda for the Parallel.For lacks an async
keyword. So I would expect it to start just about all the tasks at the same time. Eventually this will likely lead to bad things, like threadpool exhaustion.
I would suggest using another pattern for your work
If using .Net 6, use Parallel.ForEachAsync
Keep using Parallel.For but make your worker method synchronous
Use something like LimitedConcurrencyTaskScheduler (see example) to limit the number of concurrent tasks.
Use Dataflow, But I'm not very familiar with this, so i cannot advice exactly how it should be used.
Manually split your work into chunks, and run chunks in parallel.