I have this code:
var Options = new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount * 10,
CancellationToken = CTS.Token
};
while (!CTS.IsCancellationRequested)
{
var TasksZ = new[]
{
"Task A",
"Task B",
"Task C"
};
await Parallel.ForEachAsync(TasksZ, Options, async (Comando, Token) =>
{
await MyFunction(Comando)
await Task.Delay(1000, Token);
});
Now, Task A
, B
and C
start together and the cycle finish when ALL tasks are completed. Let's suppose that Task A
and B
finish in 10 seconds, but Task C
in 2 minutes. In this case, A
nd B
have to wait 2 minutes too to start again. How can i make this independent? I mean, every task for it's own thread AND considering that var TasksZ
is load dynamically and can change during the execution, by adding or removing other tasks.
Also, for stop/pause each individual task, i need a separate TaskCompletionSource
for everyone, but MyFunction
is an Interface
in common with the main app & every DLL, i need to declare every TCS
separated in the DLL(s) or just one in the common Interface
?
Edit:
My idea is (using this this code from Microsoft) to have an app that run separated DLL, using the same interface but everyone have his job to do and can't wait each other. They mainly have this sequence of work: read a file -> handle an online POST request -> save a file -> comunicate with the main app, the returned JSON, via custom class -> repeat
.
There are no other code that i can show you for let you understand, because now 90% is same as the link above, the other 10% is just the POST request with a JSON return in a custom class and load/save file.
For be 101% clear, suppose the example before, the situation should be this:
AM 12:00:00 = start all
AM 12:00:10 = task_A end // 10s
AM 12:00:10 = task_B end // 10s
AM 12:00:20 = task_A end // 10s
AM 12:00:20 = task_B end // 10s
AM 12:00:30 = task_A end // 10s
AM 12:00:30 = task_B end // 10s
...
AM 12:01:50 = task_A end // 10s
AM 12:01:50 = task_B end // 10s
AM 12:02:00 = task_C end // 2 minutes
AM 12:02:10 = task_A end // 10s
AM 12:02:10 = task_B end // 10s
...
(This because i don't need live data for task_3
, so it can POST every 2 minutes or so, but for task_1
and task_2
i need to have it live)
About the cores, the important is that the PC will not freeze or have 100% CPU. The server where i run this is a Dual Core, so MaxDegreeOfParallelism = Environment.ProcessorCount * 10
was just for not stress too much the server.
CodePudding user response:
You can use Parallel.Invoke() method to execute multiple processes at the same time.
var TasksZ = new[]
{
() => MyFunction("Task A"),
() => MyFunction("Task B"),
() => MyFunction("Task C")
};
Parallel.Invoke(Options, TasksZ);
void MyFunction(string comando)
{
Console.WriteLine(comando);
}
CodePudding user response:
As I mentioned in my comment above, you can create your own wrapper around a queue that manages background processors of your queue and re-queues the tasks as they complete.
In addition, you mentioned the need to dynamically add or remove tasks at will, which the below implementation will handle.
And finally, it takes an external CancellationToken
so that you can either call stop on the processor itself, or cancel the parent CancellationTokenSource
.
public class QueueProcessor
{
// could be replaced with a ref-count solution to ensure
// all duplicated tasks are removed
private readonly HashSet<string> _tasksToRemove = new();
private readonly ConcurrentQueue<string> _taskQueue;
private Task[] _processors;
private Func<string, CancellationToken, Task> _processorCallback;
private CancellationTokenSource _cts;
public QueueProcessor(
string[] tasks,
Func<string, CancellationToken, Task> processorCallback,
CancellationToken cancellationToken = default)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_taskQueue = new(tasks);
_processorCallback = processorCallback;
}
public async Task StartAsync(int numberOfProcessorThreads)
{
_processors = new Task[numberOfProcessorThreads];
for (int i = 0; i < _processors.Length; i )
{
_processors[i] = Task.Run(async () => await ProcessQueueAsync());
}
await Task.WhenAll(_processors);
}
public void Stop() => _cts.Cancel();
public void RemoveTask(string task)
{
lock (_tasksToRemove)
{
_tasksToRemove.Add(task);
}
}
public void AddTask(string task) => _taskQueue.Enqueue(task);
private async Task ProcessQueueAsync()
{
while (!_cts.IsCancellationRequested)
{
if (_taskQueue.TryDequeue(out var task))
{
if (ShouldTaskBeRemoved(task))
{
continue;
}
await _processorCallback(task, _cts.Token);
if (!ShouldTaskBeRemoved(task))
{
_taskQueue.Enqueue(task);
}
}
else
{
// sleep for a bit before checking for more work
await Task.Delay(1000, _cts.Token);
}
}
}
private bool ShouldTaskBeRemoved(string task)
{
lock (_tasksToRemove)
{
if (_tasksToRemove.Contains(task))
{
Console.WriteLine($"Task {task} requested for removal");
_tasksToRemove.Remove(task);
return true;
}
}
return false;
}
}
You can test the above with the following:
public async Task MyFunction(string command, CancellationToken cancellationToken)
{
await Task.Delay(50);
if (!cancellationToken.IsCancellationRequested)
{
Console.WriteLine($"Execute command: {command}");
}
else
{
Console.WriteLine($"Terminating command: {command}");
}
}
var cts = new CancellationTokenSource();
var processor = new QueueProcessor(
new string[] { "Task1", "Task2", "Task3" },
MyFunction,
cts.Token);
var task = processor.StartAsync(2);
await Task.Delay(100);
processor.RemoveTask("Task1");
await Task.Delay(500);
cts.Cancel();
await runningProcessorTask;
This results in the following output:
Execute command: Task2
Execute command: Task1
Execute command: Task3
Execute command: Task2
Task Task1 requested for removal
Execute command: Task3
Execute command: Task2
Execute command: Task2
Execute command: Task3
Execute command: Task3
Execute command: Task2
Execute command: Task2
Execute command: Task3
Execute command: Task3
Execute command: Task2
Execute command: Task2
Execute command: Task3
Execute command: Task2
Execute command: Task3
Terminating command: Task2
Terminating command: Task3
CodePudding user response:
Let me take a stab at identifying your actual root problem: you have I/O bound operations (network access, file I/O, database queries, etc) running at the same time as CPU bound operations (whatever processing you have on the former), and because of the way you wrote your code (that you don't show), you have I/O bound operations waiting for CPU bound ones to even start.
I'm guessing that because by reductio ad absurdum if everything was CPU bound then your CPU cores would be equally used no matter the order of operations, and for I/O bound operations the total time they'd take is equally independent of the order, they just have to get woken up when something finally finishes.
If I'm right, then the actual solution is to split your calls between two thread pools, one for CPU bound operations (that max at the number of available cores) and one for I/O bound operations (that max at some reasonable default, the maximum number of I/O connections that can be in flight at the same time). You can then schedule each operation to its own thread pool and await them as you normally would and they'd never step on each others' toes.