Home > Back-end >  Add and remove Parallel.Invoke actions during runtime
Add and remove Parallel.Invoke actions during runtime

Time:10-22

I'm using Parallel.Invoke to run certain methods simultaneously and collect the results when all methods are finished.

PROBLEM

As you can see on the "horrible code" section, the list of actions is hardcoded to three elements, and will be totally useless if the detectedDevicesList.Count != 3.

TRIED SOLUTIONS

I tried to dynamically create an Actions[] array and pass it as an argument for Parallel.Invoke, but I'm unable to convert my existing methods into Tasks, and then into Actions.

NON WORKING CODE

public async Task<Task<String>> callWithEveryConnectedDevice(ListBox listBoxLog, Boolean sendAlarms)
{        
    String TEST_CALLS_COMPLETED = "All test calls completed.";
    String TEST_CALLS_FAILED = "One or more test cals failed";

    return Task.Run(async () =>
    {
        List<MobileEquipment> detectedDevicesList = await GetConnectedDevices.getAsync();

        if (detectedDevicesList.Count == 0)
        {
            UpdateGui.listboxAddItem(listBoxLog, "No devices are connected.", true);
            return TEST_CALLS_FAILED;
        }

        Console.WriteLine("Executing test calls...");

        List<Task<MobileEquipment>> results = new List<Task<MobileEquipment>>();

        //Horrible code begins...
        Parallel.Invoke(() =>
        {
            results.Add(new MakePhoneCall().call(detectedDevicesList[0], listBoxLog));
        },
        () =>
        {
            results.Add(new MakePhoneCall().call(detectedDevicesList[1], listBoxLog));
        },
        () =>
        {
            results.Add(new MakePhoneCall().call(detectedDevicesList[2], listBoxLog));
        });

        //Horrible code ends...
        foreach (Task<MobileEquipment> mobileEquipment in results)
        {
            UpdateGui.listboxAddItem(listBoxLog, "Test call result for "   mobileEquipment.Result.serial   " "   mobileEquipment.Result.operador   ": "   mobileEquipment.Result.callSuccess, true);

            if (!mobileEquipment.Result.callSuccess && sendAlarms)
            {                      
                await SendEmail.sendAlarmEmailsAsync(libreta, asunto, mensaje);
            }
        }
                      

        UpdateGui.listboxAddItem(listBoxLog, TEST_CALLS_COMPLETED, true);

        return TEST_CALLS_COMPLETED;
    });
}

CodePudding user response:

Parallel.For or Parallel.Foreach and a concurrent collection. should be more appropriate:

ConcurrentStack<Task<MobileEquipment>> results = new ();
Parallel.Foreach(detectedDevicesList, d => results.Add(new MakePhoneCall().call(d, listBoxLog));

Another alternative would be parallel Linq

var result = detectedDevicesList.AsParallel(
    d => results.Add(new MakePhoneCall().call(d, listBoxLog).ToList();

However, it looks like the Call returns a task, so are you sure it is a slow blocking call? If no, it might be better to use a regular loop to start the calls, and use Task.WaitAll to (a)wait for them to complete. It looks like your current solution could block on mobileEquipment.Result.

Also note that listBoxLog looks like a UI object, and accessing UI objects from worker threads is not allowed. Using background threads for processing is much easier if the method is 'Pure' and objects that are immutable. I.e. avoid side effects that may not be thread-safe. As a general rule, I recommend avoiding multi threaded programming unless, a) there is good reason to expect some improvement, b) you are well aware of the dangers of thread safety.

You might also consider using Dataflow to setup a pipeline that does each step of the processing in a parallel and asynchronous way.

CodePudding user response:

You should use Microsoft's Reactive Framework (aka Rx) - NuGet System.Reactive and add using System.Reactive.Linq; - then all of your ugly code becomes this:

IObservable<MobileEquipment> query =
    from detectedDevicesList in Observable.FromAsync(() => GetConnectedDevices.getAsync())
    from detectedDevice in detectedDevicesList.ToObservable()
    from mobileEquipment in Observable.FromAsync(() => new MakePhoneCall().call(detectedDevice, listBoxLog))
    select mobileEquipment;

The full method now correctly returns Task<String>, rather than Task<Task<String>>.

Here it is:

public async Task<String> callWithEveryConnectedDevice(ListBox listBoxLog, Boolean sendAlarms)
{
    String TEST_CALLS_COMPLETED = "All test calls completed.";
    String TEST_CALLS_FAILED = "One or more test cals failed";

    IObservable<MobileEquipment> query =
        from detectedDevicesList in Observable.FromAsync(() => GetConnectedDevices.getAsync())
        from detectedDevice in detectedDevicesList.ToObservable()
        from mobileEquipment in Observable.FromAsync(() => new MakePhoneCall().call(detectedDevice, listBoxLog))
        select mobileEquipment;
        
    IList<MobileEquipment> results = await query.ToList();

    if (results.Count == 0)
    {
        UpdateGui.listboxAddItem(listBoxLog, "No devices are connected.", true);
        return TEST_CALLS_FAILED;
    }

    foreach (MobileEquipment mobileEquipment in results)
    {
        UpdateGui.listboxAddItem(listBoxLog, "Test call result for "   mobileEquipment.serial   " "   mobileEquipment.operador   ": "   mobileEquipment.callSuccess, true);

        if (!mobileEquipment.callSuccess && sendAlarms)
        {
            await SendEmail.sendAlarmEmailsAsync(libreta, asunto, mensaje);
        }
    }

    UpdateGui.listboxAddItem(listBoxLog, TEST_CALLS_COMPLETED, true);

    return TEST_CALLS_COMPLETED;
}

CodePudding user response:

The Parallel.Invoke is not the correct tool to use in this case because your workload is asynchronous, and the Parallel.Invoke is not async-friendly. Your problem can be solved by just creating all the CallAsync tasks at once, and then await all of them to complete with the Task.WhenAll method. After the await you are back on the UI thread, and you can update safely the UI with the results.

A handy tool for projecting the detected devices to tasks, is the Select LINQ operator.

public static async Task CallWithEveryConnectedDevice(ListBox listBoxLog, Boolean sendAlarms)
{
    List<MobileEquipment> detectedDevicesList = await GetConnectedDevices.GetAsync();

    Task<MobileEquipment>[] tasks = detectedDevicesList
        .Select(device => new MakePhoneCall().CallAsync(device))
        .ToArray();

    MobileEquipment[] results = await Task.WhenAll(tasks);

    foreach (var mobileEquipment in results)
    {
        UpdateGui.ListboxAddItem(listBoxLog,
            $"Test call result for {mobileEquipment.Serial} {mobileEquipment.Operador}: {mobileEquipment.CallSuccess}", true);
    }

    foreach (var mobileEquipment in results)
    {
        if (!mobileEquipment.CallSuccess && sendAlarms)
        {
            await SendEmail.SendAlarmEmailsAsync(libreta, asunto, mensaje);
        }
    }
}
  • Related