I have a bunch of HTTP GET calls to make, but I want to limit it to no more than 5 at once, so I tried to do the following:
var semaphore = new SemaphoreSlim(5, 5);
var threads = new List<Thread>();
var values = new ConcurrentBag<string>();
for (var i = 0; i < callsLeft; i ) {
var url = $"...";
var thread = new Thread(async () => {
await _semaphore.WaitAsync(cancellationToken);
values.Add(await _client.GetStringAsync(url, cancellationToken));
_semaphore.Release();
});
threads.Add(thread);
thread.Start();
}
foreach (var thread in threads)
thread.Join();
Debug.WriteLine("Done");
I'm having two issues with that.
- The done message is printed before all of the threads actually complete.
- 5 HTTP calls are made right away, but then it seems to only add one at a time. Essentially all the rest kick off one by one, evenly spaced. There's never a case where two or three start at once, for example.
CodePudding user response:
First of all, HttpClient operations are thread-safe. They don't need locking. Second, all HttpClient operations are asynchronous so they don't need extra threads.
All at once
A quick and dirty way to make concurrent calls would be to just fire off all operations and await all the tasks:
var tasks=urls.Select(url=>_client.GetStringAsync(url, cancellationToken))
.ToArray();
var results=await Task.WhenAll(tasks);
foreach(var result in results)
{
Console.WriteLine(result);
}
or
var tasks=urls.Select(async url=>{
var result=await _client.GetStringAsync(url, cancellationToken);
Console.WriteLine("{0}\t{1}",url,result);
});
await Task.WhenAll(tasks);
Controlled concurrency with Parallel.ForEachAsync
A better way would be to use Parallel.ForEachAsync to limit the number of concurrent operations. The default is the value of Environment.ProcessorCount
await Parallel.ForEachAsync(urls, async (url,token)=>{
var result=_client.GetStringAsync(url,token);
Console.WriteLine("{0}\t{1}",url,result);
}, cancellationToken);
Since all operations are asynchronous, we can start more than the available core count:
ParallelOptions options= new()
{
MaxDegreeOfParallelism = 10
};
await Parallel.ForEachAsync(urls, options,async (url,token)=>{
var result=_client.GetStringAsync(url,token);
Console.WriteLine("{0}\t{1}",url,result);
}, cancellationToken);
Multi-step Processing Pipeline
Another option is to use TPL DataFlow blocks to construct a pipeline that retrieves results in one step and processes them in another, again with a controlled degree of concurrency.
var dlOptions = new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism=10
};
var downloader=new TransformBlock<string,string>(
url => _client.GetStringAsync(url,cancellationToken),
dlOptions);
var parser=new TransformBlock<string,Something>(ParseIntoSomething);
var importer=new ActionBlock<Something>(ImportInDb);
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
downloader.LinkTo(parser,linkOptions);
parser.LinkTo(importer,linkOptions);
Once we set up the pipeline, we can start posting messages to the head block and await the tail to complete:
foreach(var url in urls)
{
downloader.Post(url);
}
downloader.Complete();
await importer.Completion;
In this case, at most 10 downloads are executed concurrently. Separate blocks parse 1 response at a time and store it in the database. Each of those blocks uses 1 task.
CodePudding user response:
Do not use Thread
ctor, there is no reason to (I would say that in modern .NET there is almost no reason to use Thread
s directly).
In this particular case problem is that Thread
is not Task
-aware so it does not wait for async
operation completion and returns at first await
. Better just switch to Task
s. With minimal changes it can look like this:
var tasks = new List<Task>();
for (var i = 0; i < callsLeft; i ) {
var url = $"...";
var task = Task.Run(async () =>
{
semaphore.WaitAsync(cancellationToken);
values.Add(await _client.GetStringAsync(url, cancellationToken));
semaphore.Release();
});
tasks.Add(task );
}
await Task.WhenAll(tasks);
Debug.WriteLine("Done");
Or you can consider using Parallel.ForEachAsync
with maximum concurrency specified:
await Parallel.ForEachAsync(urls,
new ParallelOptions
{
MaxDegreeOfParallelism = 5,
CancellationToken = cancellationToken
},
async (url, ct) =>
{
values.Add(await _client.GetStringAsync(url, ct));
});