Home > Net >  C# make concurrent HTTP calls
C# make concurrent HTTP calls

Time:12-14

I have a bunch of HTTP GET calls to make, but I want to limit it to no more than 5 at once, so I tried to do the following:

var semaphore = new SemaphoreSlim(5, 5);
var threads = new List<Thread>();
var values = new ConcurrentBag<string>();

for (var i = 0; i < callsLeft; i  ) {
    var url = $"...";

    var thread = new Thread(async () => {
        await _semaphore.WaitAsync(cancellationToken);

        values.Add(await _client.GetStringAsync(url, cancellationToken));

        _semaphore.Release();
    });

    threads.Add(thread);
    thread.Start();
}

foreach (var thread in threads)
    thread.Join();

Debug.WriteLine("Done");

I'm having two issues with that.

  1. The done message is printed before all of the threads actually complete.
  2. 5 HTTP calls are made right away, but then it seems to only add one at a time. Essentially all the rest kick off one by one, evenly spaced. There's never a case where two or three start at once, for example.

CodePudding user response:

First of all, HttpClient operations are thread-safe. They don't need locking. Second, all HttpClient operations are asynchronous so they don't need extra threads.

All at once

A quick and dirty way to make concurrent calls would be to just fire off all operations and await all the tasks:

var tasks=urls.Select(url=>_client.GetStringAsync(url, cancellationToken))
              .ToArray();

var results=await Task.WhenAll(tasks);

foreach(var result in results)
{
    Console.WriteLine(result);
}

or

var tasks=urls.Select(async url=>{
    var result=await _client.GetStringAsync(url, cancellationToken);
    Console.WriteLine("{0}\t{1}",url,result);
});

await Task.WhenAll(tasks);

Controlled concurrency with Parallel.ForEachAsync

A better way would be to use Parallel.ForEachAsync to limit the number of concurrent operations. The default is the value of Environment.ProcessorCount

await Parallel.ForEachAsync(urls, async (url,token)=>{
    var result=_client.GetStringAsync(url,token);
    Console.WriteLine("{0}\t{1}",url,result);
}, cancellationToken);

Since all operations are asynchronous, we can start more than the available core count:

ParallelOptions options= new()
{
    MaxDegreeOfParallelism = 10
};
await Parallel.ForEachAsync(urls, options,async (url,token)=>{
    var result=_client.GetStringAsync(url,token);
    Console.WriteLine("{0}\t{1}",url,result);
}, cancellationToken);

Multi-step Processing Pipeline

Another option is to use TPL DataFlow blocks to construct a pipeline that retrieves results in one step and processes them in another, again with a controlled degree of concurrency.

var dlOptions = new ExecutionDataflowBlockOptions {
    MaxDegreeOfParallelism=10
};
var downloader=new TransformBlock<string,string>(
                     url => _client.GetStringAsync(url,cancellationToken),
                     dlOptions);
var parser=new TransformBlock<string,Something>(ParseIntoSomething);
var importer=new ActionBlock<Something>(ImportInDb);

var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
downloader.LinkTo(parser,linkOptions);
parser.LinkTo(importer,linkOptions);

Once we set up the pipeline, we can start posting messages to the head block and await the tail to complete:

foreach(var url in urls)
{
    downloader.Post(url);
}

downloader.Complete();
await importer.Completion;

In this case, at most 10 downloads are executed concurrently. Separate blocks parse 1 response at a time and store it in the database. Each of those blocks uses 1 task.

CodePudding user response:

Do not use Thread ctor, there is no reason to (I would say that in modern .NET there is almost no reason to use Threads directly).

In this particular case problem is that Thread is not Task-aware so it does not wait for async operation completion and returns at first await. Better just switch to Tasks. With minimal changes it can look like this:

var tasks = new List<Task>();
for (var i = 0; i < callsLeft; i  ) {
    var url = $"...";

    var task = Task.Run(async () =>
    {
        semaphore.WaitAsync(cancellationToken);

        values.Add(await _client.GetStringAsync(url, cancellationToken));

        semaphore.Release();
    });

    tasks.Add(task );
}

await Task.WhenAll(tasks);

Debug.WriteLine("Done");

Or you can consider using Parallel.ForEachAsync with maximum concurrency specified:

await Parallel.ForEachAsync(urls,
    new ParallelOptions
    {
        MaxDegreeOfParallelism = 5,
        CancellationToken = cancellationToken
    },
    async (url, ct) =>
    {
        values.Add(await _client.GetStringAsync(url, ct));
    });
  • Related