Home > Software design >  C# not waiting for all Tasks to be performed
C# not waiting for all Tasks to be performed

Time:08-24

I'm trying to execute multiple requests at the same time to a Pi Number API. The main problem is that despite the 'Task.WhenAll(ExecuteRequests()).Wait();' line, it isn't completing all tasks. It should execute 50 requests and add it results to pi Dictionary, but after code execution the dictionary has about 44~46 items. I tried to add an 'availables threads at ThreadPool verification', so i could guarantee i have enough Threads, but nothing changed.

The other problem is that sometimes when I run the code, i have an error saying I'm trying to add an already added key to the dicitionary, but the lock statement wasn't supposed to guarantee this error doesn't occur?

        const int TotalRequests = 50;
        static int requestsCount = 0;
        static Dictionary<int, string> pi = new();
        static readonly object lockState = new();

        static void Main(string[] args)
        {
            var timer = new Stopwatch();
            timer.Start();

            Task.WhenAll(ExecuteRequests()).Wait();

            timer.Stop();

            foreach (var item in pi.OrderBy(x => x.Key))
                Console.Write(item.Value);

            Console.WriteLine($"\n\n{timer.ElapsedMilliseconds}ms");
            Console.WriteLine($"\n{pi.Count} items");
        }

        static List<Task> ExecuteRequests()
        {
            var tasks = new List<Task>();

            for (int i = 0; i < TotalRequests; i  )
            {
                ThreadPool.GetAvailableThreads(out int workerThreads, out int completionPortThreads);

                while (workerThreads < 1)
                {
                    ThreadPool.GetAvailableThreads(out workerThreads, out completionPortThreads);
                    Thread.Sleep(100);
                }

                tasks.Add(Task.Run(async () =>
                {
                    var currentRequestId = 0;

                    lock (lockState)
                        currentRequestId = requestsCount  ;

                    var httpClient = new HttpClient();
                    var result = await httpClient.GetAsync($"https://api.pi.delivery/v1/pi?start={currentRequestId * 1000}&numberOfDigits=1000");

                    if (result.StatusCode == System.Net.HttpStatusCode.OK)
                    {
                        var json = await result.Content.ReadAsStringAsync();
                        var content = JsonSerializer.Deserialize<JsonObject>(json)!["content"]!.ToString();
                        //var content = (await JsonSerializer.DeserializeAsync<JsonObject>(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(json)!)!)!)!["content"]!.ToString();

                        pi.Add(currentRequestId, content);
                    }
                }));
            }

            return tasks;
        }

CodePudding user response:

There`s only one problem - you turned only one part of code, which have problem with threads:

lock (lockState)
     currentRequestId = requestsCount  ;

But, there`s another one:

pi.Add(currentRequestId, content);

The problem related to dictionary idea - a lot of readers and only one writer. So, you saw case with exception and if you write try catch, you will see AggregateException, which almost in every case mean thread issues, so, you need to do this:

lock (lockState)
     pi.Add(currentRequestId, content);

CodePudding user response:

I put a lock statement around the dicitionary manipulation as @AlexeiLevenkov mentioned and it worked fine.

                tasks.Add(Task.Run(async () =>
                {
                    var currentRequestId = 0;

                    lock (lockState)
                        currentRequestId = requestsCount  ;

                    var httpClient = new HttpClient();
                    var result = await httpClient.GetAsync($"https://api.pi.delivery/v1/pi?start={currentRequestId * 1000}&numberOfDigits=1000");

                    if (result.StatusCode == System.Net.HttpStatusCode.OK)
                    {
                        var json = await result.Content.ReadAsStringAsync();
                        var content = JsonSerializer.Deserialize<JsonObject>(json)!["content"]!.ToString();
                        //var content = (await JsonSerializer.DeserializeAsync<JsonObject>(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(json)!)!)!)!["content"]!.ToString();

                        lock (lockState)
                            pi.Add(currentRequestId, content);
                    }
                }));

CodePudding user response:

I'm not directly answering the question, just suggesting that you can use Microsoft's Reactive Framework (aka Rx) - NuGet System.Reactive and add using System.Reactive.Linq; - then you can do this:

static void Main(string[] args)
{
    var timer = new Stopwatch();
    timer.Start();

    (int currentRequestId, string content)[] results = ExecuteRequests(50).ToArray().Wait()

    timer.Stop();

    foreach (var item in results.OrderBy(x => x.currentRequestId))
        Console.Write(item.content);

    Console.WriteLine($"\n\n{timer.ElapsedMilliseconds}ms");
    Console.WriteLine($"\n{results.Count()} items");
}

static IObservable<(int currentRequestId, string content)> ExecuteRequests(int totalRequests) =>
    Observable
        .Defer(() =>
        from currentRequestId in Observable.Range(0, totalRequests)
        from content in Observable.Using(() => new HttpClient(), hc =>
            from result in Observable.FromAsync(() => hc.GetAsync($"https://api.pi.delivery/v1/pi?start={currentRequestId * 1000}&numberOfDigits=1000"))
            where result.StatusCode == System.Net.HttpStatusCode.OK
            from json in Observable.FromAsync(() => result.Content.ReadAsStringAsync())
            select JsonSerializer.Deserialize<JsonObject>(json)!["content"]!.ToString())
        select new
        {
            currentRequestId,
            content,
        });
  • Related