Home > Net >  Execute Parallel.ForEach based in a List that needs to receive more items in execution time
Execute Parallel.ForEach based in a List that needs to receive more items in execution time

Time:11-12

I have a challenge here, and it's giving me a headache to find a solution.

I have a List of something, and I execute the Parallel.ForEach based on it:

List<Customer> customers = GetNotProcessedCostumer(); 

Parallel.ForEach(customers, new ParallelOptions {MaxDegreeOfParallelism = 2},
cust=> 
{
     ExecuteSomething(cust);
});

The problem here is that I need to call GetNotProcessedCostumer again to check if new not processed items are available on database, while this parallel is still running. Call the method again is OK, but, how can I insert new items in the List that the parallel is already using ?

In others words, the List<Customer> is alive, and I need to insert items all the time on it, and try to use the available threads in the existing Parallel. Take a look:

List<Customer> customers = GetNotProcessCustomer // get not processed customers from database

Parallel.ForEach(customers) // ...... Start the parallel ... 

customer.Add(GetNotProcessCustomer()) // Read database again..

"Hey Parallel, do you have any thread available ?" If yes, use it.

I can accept others approaches and ideas, like Threads, ThreadPool.........

Could someone help me please?

CodePudding user response:

There are probably better ways to do this job than the Parallel class, with an ActionBlock<Customer> from the TPL Dataflow library being the most promising candidate. But if you want to do your job using the knowledge you already have, you could feed the parallel loop with a deferred IEnumerable<Customer> sequence instead of a materialized List<Customer>. This sequence will be querying the database and yielding the not-processed customers in a never ending loop. It might be a good idea to add a Task.Delay in the mix, to ensure that the database will not be queried more frequently than every X seconds.

IEnumerable<Customer> GetNotProcessedCustomersNonStop(
    CancellationToken cancellationToken = default)
{
    while (true)
    {
        var delayTask = Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
        foreach (var customer in GetNotProcessedCustomers())
            yield return customer;
        delayTask.GetAwaiter().GetResult();
    }
}

Adding a CancellationToken in the mix is probably also a good idea, because eventually you want to stop the loop, don't you?

If you are not familiar with deferred enumerable sequences and the yield statement, you can take a look at this document: Iterators

One final important detail is to tell the Parallel class that you don't want it to do fancy things, like enumerating greedily the enumerable and caching its items. You want it to grab the next customer only when it's ready to process it. You can do it by throwing a Partitioner.Create in the mix. Putting everything together:

var cts = new CancellationTokenSource();

var source = Partitioner.Create(GetNotProcessedCustomersNonStop(cts.Token),
    EnumerablePartitionerOptions.NoBuffering);

var parallelOptions = new ParallelOptions()
{
    MaxDegreeOfParallelism = 2,
    CancellationToken = cts.Token,
};

Parallel.ForEach(source, parallelOptions, customer =>
{
    ProcessCustomer(customer);
});

//cts.Cancel(); // eventually...
  • Related