Home > database >  How to chunkify an IEnumerable<T>, without losing/discarding items in case of failure?
How to chunkify an IEnumerable<T>, without losing/discarding items in case of failure?

Time:07-21

I have a producer-consumer scenario where the producer is an enumerable sequence of items (IEnumerable<Item>). I want to process these items in chunks/batches of 10 items each. So I decided to use the new (.NET 6) Chunk LINQ operator, as suggested in this question: Create batches in LINQ.

My problem is that sometimes the producer fails, and in this case the consumer of the chunkified sequence receives the error without first receiving a chunk with the last items that were produced before the error. So if for example the producer generates 15 items and then fails, the consumer will get a chunk with the items 1-10 and then will get an exception. The items 11-15 will be lost! Here is a minimal example that demonstrates this undesirable behavior:

static IEnumerable<int> Produce()
{
    int i = 0;
    while (true)
    {
        i  ;
        Console.WriteLine($"Producing #{i}");
        yield return i;
        if (i == 15) throw new Exception("Oops!");
    }
}

// Consume
foreach (int[] chunk in Produce().Chunk(10))
{
    Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}

Output:

Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Unhandled exception. System.Exception: Oops!
   at Program.<Main>g__Produce|0_0() MoveNext()
   at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size) MoveNext()
   at Program.Main()

Online demo.

The desirable behavior would be to get a chunk with the values [11, 12, 13, 14, 15] before getting the exception.

My question is: Is there any way to configure the Chunk operator so that it prioritizes emitting data instead of exceptions? If not, how can I implement a custom LINQ operator, named for example ChunkNonDestructive, with the desirable behavior?

public static IEnumerable<TSource[]> ChunkNonDestructive<TSource>(
    this IEnumerable<TSource> source, int size);

Note: Except from the System.Linq.Chunk operator I also experimented with the Buffer operator from the System.Interactive package, as well as the Batch operator from the MoreLinq package. Apparently they all behave the same (destructively).


Update: Here is the desirable output of the above example:

Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Consumed: [11, 12, 13, 14, 15]
Unhandled exception. System.Exception: Oops!
   at Program.<Main>g__Produce|0_0() MoveNext()
   at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size) MoveNext()
   at Program.Main()

The difference is the line Consumed: [11, 12, 13, 14, 15], that is not present in the actual output.

CodePudding user response:

If you preprocess your source to make it stop when it encounters an exception, then you can use Chunk() as-is.

public static class Extensions
{
    public static IEnumerable<T> UntilFirstException<T>(this IEnumerable<T> source, Action<Exception> exceptionCallback = null)
    {
        var enumerator = source.GetEnumerator();
        while(true)
        {
            try
            {
                if(!enumerator.MoveNext())
                {
                    break;
                }
            } catch (Exception e) {
                exceptionCallback?.Invoke(e);
                break;
            }
            yield return enumerator.Current;
        }
    }
}
    Exception? e = null;
    foreach (int[] chunk in Produce().UntilFirstException(thrown => e = thrown).Chunk(10))
    {
        Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
    }

I feel like that keeps responsibilities separated nicely. If you want a helper that throws an exception instead of having to capture it yourself, you can use this as a component to simplify writing that helper:

    public static IEnumerable<T[]> ChunkUntilFirstException<T>(this IEnumerable<T> source, int size)
    {
        Exception? e = null;
        var result = source.UntilFirstException(thrown => e = thrown).Chunk(size);
        foreach (var element in result)
        {
            yield return element;
        }
        if (e != null)
        {
            throw new InvalidOperationException("source threw an exception", e);
        }
    }

Note that this will throw a different exception than the one emitted by the producer. This lets you keep the stack trace associated with the original exception, whereas throw e would overwrite that stack trace.

You can tweak this according to your needs. If you need to catch a specific type of exception that you're expecting your producer to emit, it's easy enough to use the when contextual keyword with some pattern matching.

    try
    {
        foreach (int[] chunk in Produce().ChunkUntilFirstException(10))
        {
            Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
        }
    }
    catch (InvalidOperationException e) when (e.InnerException is {Message: "Oops!"})
    {
        Console.WriteLine(e.InnerException.ToString());
    }

CodePudding user response:

First off, a matter of semantics. There's nothing destructive in Chunk or Buffer or anything else, it just reads items from a source enumerable until it's over or it throws an exception. The only destructive thing in your code is you throwing exceptions, which behaves as expected (ie, unwinds the stack out of your generator, out of the Linq functions and into a catch in your code, if any exists).

Also it should be immediately obvious that every Linq functions will behave the same in regards to exceptions. It's in fact how exceptions work, and working around them to support your use case is relatively expensive: you'll need to swallow exceptions for every item you generate. This, in my humble opinion, is incredibly bad design, and you'd be fired on the spot if you worked for me and did that.

With all that out of the way, writing a BadDesignChunk like that is trivial (if expensive):

public static IEnumerable<IEnumerable<TSource>> BadDesignChunk<TSource>(this IEnumerable<TSource> source, int size)
{
    Exception caughtException = default;
    var chunk = new List<TSource>();
    using var enumerator = source.GetEnumerator();
    
    while(true)
    {
        while(chunk.Count < size)
        {
            try
            {
                if(!enumerator.MoveNext())
                {
                    // end of the stream, send what we have and finish
                    goto end;
                }
            }
            catch(Exception ex)
            {
                // exception, send what we have and finish
                caughtException = ex;
                goto end;
            }
            
            chunk.Add(enumerator.Current);
        }
        
        // chunk full, send it
        yield return chunk;
        chunk.Clear();
    }
    
    end:
    if(chunk.Count > 0)
        yield return chunk;
    if(caughtException is not null)
        throw caughtException;
}

See it in action here.

CodePudding user response:

I can't see how that can be done in sensible way (without ignoring exceptions).

I believe you are asking for impossible thing based on how I read your post: preserve the exception but delay it till some later time but guarantee that the exception will happen. If you are ok to drop the exception from producer from time to time I'd say you should be ok to ignore it always and simple wrapper that ignores exceptions would be sufficient.

How exceptions will be dropped in the hypothetical implementation: to get a chunk you need to enumerate items and save the exception for the last item. At this point there are two places where exception can be thrown: while iterating a chunk (not necessarily to happen if something like First used or another exception thrown while processing existing items) or while getting next chunk (not necessarily to happen for the same reasons). If you somehow (through code reviews and testing?) guarantee that all sequences are iterated to the end you get into problem with order of exceptions - first exception (from the provider) will happen after exceptions happened in processing chunks.

The alternative approach one can think of "what about reading items directly while iterating the chunk" (which would throw the exception at appropriate time when you about to get that item) is impossible for regular "chunk" API: that API give you multiple "pointers" into source sequence that can all be advanced at the same time - as result the only option is to materialize each chunk (thus triggering the exception) so there is only one live "pointer" into the source sequence (pointing before the first element of next to read chunk).

  • Related