Home > other >  Cannot retrieve chunks of data from BlockingCollection<T>
Cannot retrieve chunks of data from BlockingCollection<T>

Time:01-24

I often find myself in a situation where I do want to stream data in chunks rather then one by one. Usually I do this when I need to do some I/O based operation like database inserts where I want to limit roundtrips. So I got myself this nice little extension method:

        public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> data, int size)
        {            
            using (var enumerator = data.GetEnumerator())
            {
                while (enumerator.MoveNext())
                {
                    yield return YieldBatchElements(enumerator, size - 1).ToList();
                }
            }

            IEnumerable<TU> YieldBatchElements<TU>(
                IEnumerator<TU> source,
                int batchSize)
            {
                yield return source.Current;
                for (var i = 0; i < batchSize && source.MoveNext(); i  )
                {
                    yield return source.Current;
                }
            }
        }

This works just fine but I noticed that it does not work with BlockCollection<T> GetConsumingEnumerable

I created following Test Method to demonstrate my findings:

        [Test]
        public static void ConsumeTest()
        {
            var queue = new BlockingCollection<int>();
            var i = 0;
            foreach (var x in Enumerable.Range(0, 10).Split(3))
            {
                Console.WriteLine($"Fetched chunk: {x.Count}");
                Console.WriteLine($"Fetched total: {i  = x.Count}");
            }
            //Fetched chunk: 3
            //Fetched total: 3
            //Fetched chunk: 3
            //Fetched total: 6
            //Fetched chunk: 3
            //Fetched total: 9
            //Fetched chunk: 1
            //Fetched total: 10
         

            Task.Run(
                () =>
                    {
                        foreach (var x in Enumerable.Range(0, 10))
                        {
                            queue.Add(x);
                        }
                    });

            i = 0;
            foreach (var element in queue.GetConsumingEnumerable(
                new CancellationTokenSource(3000).Token).Split(3))
            {
                Console.WriteLine($"Fetched chunk: {element.Count}");
                Console.WriteLine($"Fetched total: {i  = element.Count}");
            }

            //Fetched chunk: 3
            //Fetched total: 3
            //Fetched chunk: 3
            //Fetched total: 6
            //Fetched chunk: 3
            //Fetched total: 9
        }

Apparently the last chunk is being "dropped" if there are less elements than chunk size. Any ideas?

CodePudding user response:

We need to call "CompleteAdding" method which will inform the "GetConsumingEnumerable" that there are no more elements to add from producer.

Below code will print the missing lines also.

Task.Run(() =>
              {
                 foreach (var x in Enumerable.Range(0, 12))
                 {
                    queue.Add(x);
                 }
                    queue.CompleteAdding();
              });

Please refer this link for more info about GetConsumingEnumerable.

  • Related