I'm using Parallel.Invoke
to run certain methods simultaneously and collect the results when all methods are finished.
PROBLEM
As you can see on the "horrible code" section, the list of actions is hardcoded to three elements, and will be totally useless if the detectedDevicesList.Count != 3
.
TRIED SOLUTIONS
I tried to dynamically create an Actions[]
array and pass it as an argument for Parallel.Invoke
, but I'm unable to convert my existing methods into Task
s, and then into Action
s.
NON WORKING CODE
public async Task<Task<String>> callWithEveryConnectedDevice(ListBox listBoxLog, Boolean sendAlarms)
{
String TEST_CALLS_COMPLETED = "All test calls completed.";
String TEST_CALLS_FAILED = "One or more test cals failed";
return Task.Run(async () =>
{
List<MobileEquipment> detectedDevicesList = await GetConnectedDevices.getAsync();
if (detectedDevicesList.Count == 0)
{
UpdateGui.listboxAddItem(listBoxLog, "No devices are connected.", true);
return TEST_CALLS_FAILED;
}
Console.WriteLine("Executing test calls...");
List<Task<MobileEquipment>> results = new List<Task<MobileEquipment>>();
//Horrible code begins...
Parallel.Invoke(() =>
{
results.Add(new MakePhoneCall().call(detectedDevicesList[0], listBoxLog));
},
() =>
{
results.Add(new MakePhoneCall().call(detectedDevicesList[1], listBoxLog));
},
() =>
{
results.Add(new MakePhoneCall().call(detectedDevicesList[2], listBoxLog));
});
//Horrible code ends...
foreach (Task<MobileEquipment> mobileEquipment in results)
{
UpdateGui.listboxAddItem(listBoxLog, "Test call result for " mobileEquipment.Result.serial " " mobileEquipment.Result.operador ": " mobileEquipment.Result.callSuccess, true);
if (!mobileEquipment.Result.callSuccess && sendAlarms)
{
await SendEmail.sendAlarmEmailsAsync(libreta, asunto, mensaje);
}
}
UpdateGui.listboxAddItem(listBoxLog, TEST_CALLS_COMPLETED, true);
return TEST_CALLS_COMPLETED;
});
}
CodePudding user response:
Parallel.For
or Parallel.Foreach
and a concurrent collection. should be more appropriate:
ConcurrentStack<Task<MobileEquipment>> results = new ();
Parallel.Foreach(detectedDevicesList, d => results.Add(new MakePhoneCall().call(d, listBoxLog));
Another alternative would be parallel Linq
var result = detectedDevicesList.AsParallel(
d => results.Add(new MakePhoneCall().call(d, listBoxLog).ToList();
However, it looks like the Call
returns a task, so are you sure it is a slow blocking call? If no, it might be better to use a regular loop to start the calls, and use Task.WaitAll to (a)wait for them to complete. It looks like your current solution could block on mobileEquipment.Result
.
Also note that listBoxLog
looks like a UI object, and accessing UI objects from worker threads is not allowed. Using background threads for processing is much easier if the method is 'Pure' and objects that are immutable. I.e. avoid side effects that may not be thread-safe. As a general rule, I recommend avoiding multi threaded programming unless, a) there is good reason to expect some improvement, b) you are well aware of the dangers of thread safety.
You might also consider using Dataflow to setup a pipeline that does each step of the processing in a parallel and asynchronous way.
CodePudding user response:
You should use Microsoft's Reactive Framework (aka Rx) - NuGet System.Reactive
and add using System.Reactive.Linq;
- then all of your ugly code becomes this:
IObservable<MobileEquipment> query =
from detectedDevicesList in Observable.FromAsync(() => GetConnectedDevices.getAsync())
from detectedDevice in detectedDevicesList.ToObservable()
from mobileEquipment in Observable.FromAsync(() => new MakePhoneCall().call(detectedDevice, listBoxLog))
select mobileEquipment;
The full method now correctly returns Task<String>
, rather than Task<Task<String>>
.
Here it is:
public async Task<String> callWithEveryConnectedDevice(ListBox listBoxLog, Boolean sendAlarms)
{
String TEST_CALLS_COMPLETED = "All test calls completed.";
String TEST_CALLS_FAILED = "One or more test cals failed";
IObservable<MobileEquipment> query =
from detectedDevicesList in Observable.FromAsync(() => GetConnectedDevices.getAsync())
from detectedDevice in detectedDevicesList.ToObservable()
from mobileEquipment in Observable.FromAsync(() => new MakePhoneCall().call(detectedDevice, listBoxLog))
select mobileEquipment;
IList<MobileEquipment> results = await query.ToList();
if (results.Count == 0)
{
UpdateGui.listboxAddItem(listBoxLog, "No devices are connected.", true);
return TEST_CALLS_FAILED;
}
foreach (MobileEquipment mobileEquipment in results)
{
UpdateGui.listboxAddItem(listBoxLog, "Test call result for " mobileEquipment.serial " " mobileEquipment.operador ": " mobileEquipment.callSuccess, true);
if (!mobileEquipment.callSuccess && sendAlarms)
{
await SendEmail.sendAlarmEmailsAsync(libreta, asunto, mensaje);
}
}
UpdateGui.listboxAddItem(listBoxLog, TEST_CALLS_COMPLETED, true);
return TEST_CALLS_COMPLETED;
}
CodePudding user response:
The Parallel.Invoke
is not the correct tool to use in this case because your workload is asynchronous, and the Parallel.Invoke
is not async-friendly. Your problem can be solved by just creating all the CallAsync
tasks at once, and then await
all of them to complete with the Task.WhenAll
method. After the await
you are back on the UI thread, and you can update safely the UI with the results.
A handy tool for projecting the detected devices to tasks, is the Select
LINQ operator.
public static async Task CallWithEveryConnectedDevice(ListBox listBoxLog, Boolean sendAlarms)
{
List<MobileEquipment> detectedDevicesList = await GetConnectedDevices.GetAsync();
Task<MobileEquipment>[] tasks = detectedDevicesList
.Select(device => new MakePhoneCall().CallAsync(device))
.ToArray();
MobileEquipment[] results = await Task.WhenAll(tasks);
foreach (var mobileEquipment in results)
{
UpdateGui.ListboxAddItem(listBoxLog,
$"Test call result for {mobileEquipment.Serial} {mobileEquipment.Operador}: {mobileEquipment.CallSuccess}", true);
}
foreach (var mobileEquipment in results)
{
if (!mobileEquipment.CallSuccess && sendAlarms)
{
await SendEmail.SendAlarmEmailsAsync(libreta, asunto, mensaje);
}
}
}