I have a challenge here, and it's giving me a headache to find a solution.
I have a List
of something, and I execute the Parallel.ForEach
based on it:
List<Customer> customers = GetNotProcessedCostumer();
Parallel.ForEach(customers, new ParallelOptions {MaxDegreeOfParallelism = 2},
cust=>
{
ExecuteSomething(cust);
});
The problem here is that I need to call GetNotProcessedCostumer
again to check if new not processed items are available on database, while this parallel is still running.
Call the method again is OK, but, how can I insert new items in the List
that the parallel is already using ?
In others words, the List<Customer>
is alive, and I need to insert items all the time on it, and try to use the available threads in the existing Parallel
. Take a look:
List<Customer> customers = GetNotProcessCustomer // get not processed customers from database
Parallel.ForEach(customers) // ...... Start the parallel ...
customer.Add(GetNotProcessCustomer()) // Read database again..
"Hey Parallel, do you have any thread available ?" If yes, use it.
I can accept others approaches and ideas, like Threads
, ThreadPool
.........
Could someone help me please?
CodePudding user response:
There are probably better ways to do this job than the Parallel
class, with an ActionBlock<Customer>
from the TPL Dataflow library being the most promising candidate. But if you want to do your job using the knowledge you already have, you could feed the parallel loop with a deferred IEnumerable<Customer>
sequence instead of a materialized List<Customer>
. This sequence will be querying the database and yielding the not-processed customers in a never ending loop. It might be a good idea to add a Task.Delay
in the mix, to ensure that the database will not be queried more frequently than every X seconds.
IEnumerable<Customer> GetNotProcessedCustomersNonStop(
CancellationToken cancellationToken = default)
{
while (true)
{
var delayTask = Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
foreach (var customer in GetNotProcessedCustomers())
yield return customer;
delayTask.GetAwaiter().GetResult();
}
}
Adding a CancellationToken
in the mix is probably also a good idea, because eventually you want to stop the loop, don't you?
If you are not familiar with deferred enumerable sequences and the yield
statement, you can take a look at this document: Iterators
One final important detail is to tell the Parallel
class that you don't want it to do fancy things, like enumerating greedily the enumerable and caching its items. You want it to grab the next customer only when it's ready to process it. You can do it by throwing a Partitioner.Create
in the mix. Putting everything together:
var cts = new CancellationTokenSource();
var source = Partitioner.Create(GetNotProcessedCustomersNonStop(cts.Token),
EnumerablePartitionerOptions.NoBuffering);
var parallelOptions = new ParallelOptions()
{
MaxDegreeOfParallelism = 2,
CancellationToken = cts.Token,
};
Parallel.ForEach(source, parallelOptions, customer =>
{
ProcessCustomer(customer);
});
//cts.Cancel(); // eventually...