I often find myself in a situation where I do want to stream data in chunks rather then one by one. Usually I do this when I need to do some I/O based operation like database inserts where I want to limit roundtrips. So I got myself this nice little extension method:
public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> data, int size)
{
using (var enumerator = data.GetEnumerator())
{
while (enumerator.MoveNext())
{
yield return YieldBatchElements(enumerator, size - 1).ToList();
}
}
IEnumerable<TU> YieldBatchElements<TU>(
IEnumerator<TU> source,
int batchSize)
{
yield return source.Current;
for (var i = 0; i < batchSize && source.MoveNext(); i )
{
yield return source.Current;
}
}
}
This works just fine but I noticed that it does not work with BlockCollection<T> GetConsumingEnumerable
I created following Test Method to demonstrate my findings:
[Test]
public static void ConsumeTest()
{
var queue = new BlockingCollection<int>();
var i = 0;
foreach (var x in Enumerable.Range(0, 10).Split(3))
{
Console.WriteLine($"Fetched chunk: {x.Count}");
Console.WriteLine($"Fetched total: {i = x.Count}");
}
//Fetched chunk: 3
//Fetched total: 3
//Fetched chunk: 3
//Fetched total: 6
//Fetched chunk: 3
//Fetched total: 9
//Fetched chunk: 1
//Fetched total: 10
Task.Run(
() =>
{
foreach (var x in Enumerable.Range(0, 10))
{
queue.Add(x);
}
});
i = 0;
foreach (var element in queue.GetConsumingEnumerable(
new CancellationTokenSource(3000).Token).Split(3))
{
Console.WriteLine($"Fetched chunk: {element.Count}");
Console.WriteLine($"Fetched total: {i = element.Count}");
}
//Fetched chunk: 3
//Fetched total: 3
//Fetched chunk: 3
//Fetched total: 6
//Fetched chunk: 3
//Fetched total: 9
}
Apparently the last chunk is being "dropped" if there are less elements than chunk size. Any ideas?
CodePudding user response:
We need to call "CompleteAdding" method which will inform the "GetConsumingEnumerable" that there are no more elements to add from producer.
Below code will print the missing lines also.
Task.Run(() =>
{
foreach (var x in Enumerable.Range(0, 12))
{
queue.Add(x);
}
queue.CompleteAdding();
});
Please refer this link for more info about GetConsumingEnumerable.