I receive a sequence of objects (e.g., ItemGroup[]
) that each contains multiple jobs (e.g., Item[]
) and a max degree of parallelism value, for example:
public record Item(string Name);
public record ItemGroup(Item[] Items, int MaxDegreeOfParallelism);
The sequence of ItemGroup
instances must be processed sequentially, but each ItemGroup
may have a max degree of parallelism higher than 1. For example, the pipeline will process the group of A*
items sequentially, then process the group of B*
items concurrently:
var groups = new[]
{
new ItemGroup(new[] { new Item("A0"), new Item("A1"), new Item("A2") }, 1),
new ItemGroup(new[] { new Item("B0"), new Item("B1"), new Item("B2") }, 3)
};
I thought a custom TransformManyBlock
implementation derived from IPropagatorBlock<ItemGroup, Item>
would be a good choice, but I am not clear how to properly wait on the TransformManyBlock
dynamically created internally as the producer posts ItemGroup
instances to it.
Can anyone guide me here?
CodePudding user response:
You could create an inner TransformBlock<Item, Item>
for each ItemGroup
received. Below is a generalized solution with TInput
, TChild
and TOutput
generic parameters. The TInput
corresponds to a ItemGroup
, the TChild
corresponds to a Item
, and the TOutput
is also Item
since you propagate the items without transforming them:
public static IPropagatorBlock<TInput, TOutput> CreateTransformManyDynamicBlock
<TInput, TChild, TOutput>(
Func<TInput, IEnumerable<TChild>> childrenSelector,
Func<TInput, int> degreeOfParallelismSelector,
Func<TChild, TOutput> transformChild)
{
ArgumentNullException.ThrowIfNull(childrenSelector);
ArgumentNullException.ThrowIfNull(degreeOfParallelismSelector);
ArgumentNullException.ThrowIfNull(transformChild);
return new TransformManyBlock<TInput, TOutput>(async input =>
{
TransformBlock<TChild, TOutput> innerBlock = new(transformChild, new()
{
MaxDegreeOfParallelism = degreeOfParallelismSelector(input)
});
foreach (var child in childrenSelector(input))
{
bool accepted = innerBlock.Post(child);
if (!accepted) break; // The innerBlock has failed
}
innerBlock.Complete();
// Propagate the results
List<TOutput> results = new();
while (await innerBlock.OutputAvailableAsync().ConfigureAwait(false))
while (innerBlock.TryReceive(out TOutput result))
results.Add(result);
try { await innerBlock.Completion.ConfigureAwait(false); }
catch when (innerBlock.Completion.IsCanceled) { throw; }
catch { innerBlock.Completion.Wait(); } // Propagate AggregateException
return results;
});
}
Usage example:
IPropagatorBlock<ItemGroup, Item> block =
CreateTransformManyDynamicBlock<ItemGroup, Item, Item>(
x => x.Items, x => x.MaxDegreeOfParallelism, x => x);
Note: The above code has not been tested.
Update: My original implementation (revision 1) was based on the .NET 7 API ReceiveAllAsync
, and the TransformManyBlock
constructor that takes a Func<TInput,IAsyncEnumerable<TOutput>>
argument (also .NET 7).
The problem was that the ReceiveAllAsync
doesn't propagate the exception of the enumerated dataflow block,
so I switched to collecting and propagating manually the results, by filling a List<TOutput>
as shown in this answer.