Home > Back-end >  Creating 2 (or more) lists of tasks, and starting them concurrently
Creating 2 (or more) lists of tasks, and starting them concurrently

Time:03-17

I've created two lists of tasks, and I'd like to start processing both lists concurrently.

Although the total number of tasks is relatively small (perhaps less that 10 in total), processing time for each task varies, and I can start two tasks simultaneously as they target a different API.

To differentiate the API I've created two lists, and have populated each task list based on the API that will be utilized.

Within each list of task, I'd like their tasks processed sequentially; however, the processing order in list is not important.

At any given instance, two tasks could be running concurrently as long as the two tasks are not members of same task list.

My first thought was to build along these lines:

var lgoTasks = new List<Task>();
foreach (var x in lgo)
{
  var t = sClient.SubmitToMarket(x.Id);
  lgoTasks.Add(t);
}

var opApiTasks = new List<Task>();
foreach (var x in opApi)
{
  var t = sClient.SubmitToMarket(x.Id);
  opApiTasks.Add(t);
}

await Task.WhenAll(lgoTasks, opApiTasks);

But it fails for two (or more?) reason

  1. WhenAll() won't accept two (or more) Task Arrays
  2. Even if WhenAll() accepted two Task Arrays, the processing isn't sequential

Next I tried this approach:

var lgoCount = lgo.Count();
var opApiCount = opApi.Count();

var maxCounter = lgoCount > opApiCount ? lgoCount : opApiCount;
try
{
  for (int i = 0; i < maxCounter; i  )
  {
    if (lgoCount - 1 >= i && opApiCount - 1 >= i)
    {
      var x = sClient.SubmitToMarket(lgo[i].Id);
      var y = sClient.SubmitToMarket(opApi[i].Id);

      await Task.WhenAll(x, y);
    }
    else if (lgoCount - 1 >= i)
    {
      await sClient.SubmitToMarket(Convert.ToInt32(lgo[i].Id));
    }
    else if (opApiCount - 1 >= i)
    {
      await sClient.SubmitToMarket(Convert.ToInt32(opApi[i].Id));
    }
  }

And although it works somewhat, WHENALL() creates a blocker until both its tasks are completed.

How do I concurrently start each list of tasks, making sure just one task from each list is running, and without creating blockers (like I did) using WHENALL()? Before my method returns control to the caller, all tasks in each task list are required to finish.

Thanks kindly for considering my question, and I look forward to your comments.

CodePudding user response:

Solution

You have: two list of tasks (which all btw. are started, please see below)

You need: two tasks each of which executes its 'children' sequentially.

Here's a toy example:

Console.WriteLine("Main start");
await Task.WhenAll(WorkAsync("A"), WorkAsync("B"));
Console.WriteLine("Main done.");

async Task WorkAsync(string type)
{
    for(int i = 0; i < 3; i   )
        await WaitAndPrintAsync(type i);
}

async Task WaitAndPrintAsync(string s)
{
    Console.WriteLine($"{s} start");
    await Task.Delay(TimeSpan.FromMilliseconds(100));
    Console.WriteLine($"{s} end");
}

This results in

Main start
A0 start
B0 start
B0 end
A0 end
A1 start
B1 start
A1 end
B1 end
A2 start
B2 start
B2 end
A2 end
Main done.

Tasks are already started

When you're doing

var lgoTasks = new List<Task>();
foreach (var x in lgo)
{
  var t = sClient.SubmitToMarket(x.Id);
  lgoTasks.Add(t);
}

you are starting the tasks and just not waiting for their finish.

Here's a simple example to prove this.

Console.WriteLine("Main start");

var lgoTasks = new List<Task>();
for (int i = 0; i < 3; i  )
{
    Task t = WaitAndPrintAsync("A"   i);
    lgoTasks.Add(t);
}

var opApiTasks = new List<Task>();
for (int i = 0; i < 3; i  )
{
    Task t = WaitAndPrintAsync("B"   i);
    opApiTasks.Add(t);
}

Console.WriteLine("Before WhenAll");
var allTasks = lgoTasks.Concat(opApiTasks);
await Task.WhenAll(allTasks);

Console.WriteLine("Main done.");

async Task WaitAndPrintAsync(string s)
{
    Console.WriteLine($"{s} start");
    await Task.Delay(TimeSpan.FromMilliseconds(100));
    Console.WriteLine($"{s} end");
}

This prints:

Main start
A0 start
A1 start
A2 start
B0 start
B1 start
B2 start
Before WhenAll
B0 end
B2 end
B1 end
A1 end
A2 end
A0 end
Main done.

Notes:

  1. All tasks are started before WhenAll is called.
  2. Since and opApiTasks are list of independent tasks I was able to just concatenate both lists and use Task WhenAll(IEnumerable<Task> tasks); to wait for them to finish.

CodePudding user response:

Does SubmitToMarket start already a new task and return it? Then e. g. all lgoTasks will run concurrently! But AFAIK your description, my solution would be about (no warranty for syntax errors):

var lgoSequenceTask = new Task(async () =>
        {
            foreach (var x in lgo) // execute in sequence (in different tasks)
            {
                await sClient.SubmitToMarket(x.Id);
            }
        }
    );

var opApiSequenceTask = new Task(async () => 
        {
            foreach (var x in opApi) // execute in sequence (in different tasks)
            {
                await sClient.SubmitToMarket(x.Id);
            }
        }
    );

var bothTasks = new List<Task>() { lgoSequenceTask, opApiSequenceTask};

foreach (var task in bothTasks) // start both sequences concurrently
{
    task.Start(); 
}

await Task.WhenAll(bothTasks);

My understandig is that you want to have such a timing as an example:

        a                               b
lgo:    <-L1-><----L2---><-L3-><---L4-->|
opApi:   <--O1--><-O2-><----O3--->      |

times:
a: both tasks start in loop,
b: Task.WhenAll finished at |

CodePudding user response:

You should use Microsoft's Reactive Framework (aka Rx) - NuGet System.Reactive and add using System.Reactive.Linq; - then you can do this:

var query1 = lgo.ToObservable().SelectMany(x => Observable.FromAsync(() => sClient.SubmitToMarket(x.Id)));
var query2 = opApi.ToObservable().SelectMany(x => Observable.FromAsync(() => sClient.SubmitToMarket(x.Id)));

await query1.Merge(query2);

Alternatively, if you don't want the beauty of Rx, then just do this for your WhenAll:

await Task.WhenAll(lgoTasks.Concat(opApiTasks));
  • Related