Home > Mobile >  Exception is lost while consuming a PLINQ query
Exception is lost while consuming a PLINQ query

Time:12-01

I observed a weird behavior while experimenting with a PLINQ query. Here is the scenario:

  • There is a source IEnumerable<int> sequence that contains the two items 1 and 2.
  • A Parallel LINQ Select operation is applied on this sequence, projecting each item to itself (x => x).
  • The resulting ParallelQuery<int> query is consumed immediately with a foreach loop.
  • The selector lambda of the Select projects successfully the item 1.
  • The consuming foreach loop throws an exception for the item 1.
  • The selector lambda throws an exception for the item 2, after a small delay.

What happens next is that the consuming exception is lost! Apparently it is shadowed by the exception thrown afterwards in the Select. Here is a minimal demonstration of this behavior:

ParallelQuery<int> query = Enumerable.Range(1, 2)
    .AsParallel()
    .Select(x =>
    {
        if (x == 2) { Thread.Sleep(500); throw new Exception($"Oops!"); }
        return x;
    });

try
{
    foreach (int item in query)
    {
        Console.WriteLine($"Consuming item #{item} started");
        throw new Exception($"Consuming item #{item} failed");
    }
}
catch (AggregateException aex)
{
    Console.WriteLine($"AggregateException ({aex.InnerExceptions.Count})");
    foreach (Exception ex in aex.InnerExceptions)
        Console.WriteLine($"- {ex.GetType().Name}: {ex.Message}");
}
catch (Exception ex)
{
    Console.WriteLine($"{ex.GetType().Name}: {ex.Message}");
}

Output:

Consuming item #1 started
AggregateException (1)
- Exception: Oops!

Live demo.

Chronologically the consuming exception happens first, and the PLINQ exception happens later. So my understanding is that the consuming exception is more important, and it should be propagated with priority. Nevertheless the only exception that is surfaced is the one that occurs inside the PLINQ code.

My question is: why is the consuming exception lost, and is there any way that I can fix the query so that the consuming exception is propagated with priority?

The desirable output is this:

Consuming item #1 started
Exception: Consuming item #1 failed

CodePudding user response:

I think what you are seeing is the result of the compiler translation of the foreach into a while (MoveNext()) with a try/finally to dispose of the enumerator. When the inner exception is thrown, it is caught by the finally and the Dispose() of the enumerator causes all the Select threads to finish, which causes an exception inside the finally block, which throws away the initial exception as discussed here. You need to use your own loop and a try/catch if you want to prevent this, though I think the Microsoft recommendation would be to use a try/catch in the Select to be closer to the source of the exception.

Here is a modification of your existing code replacing the foreach with the compiler generated expansion of foreach using an enumerator. (I use LINQPad to see the C# 1.0 equivalent code / IL code from the compiler.)

You can capture any exceptions during the Dispose of the enumerator and then bundle them up with the original exception into an AggregateException when you catch them.

I wrapped the boilerplate into an extension method to replace the normal foreach:

var b = true;
var query = Enumerable.Range(1, 3)
    .AsParallel()
    .Select(x => {
        Thread.Sleep(50 * (x - 1));
        Console.WriteLine($"Select({x})");
        if (x >= 2) {
            throw new Exception($"Oops {x}!");
        }
        return x;
    });

try {
    query.ForEachAggregatingExceptions(item => {
        Console.WriteLine($"Consuming item #{item} started");
        if (b) {
            throw new Exception($"Consuming item #{item} failed");
        }
    });
}
catch (AggregateException aex) {
    Console.WriteLine($"AggregateException ({aex.InnerExceptions.Count})");
    foreach (Exception ex in aex.InnerExceptions)
        Console.WriteLine($"- {ex.GetType().Name}: {ex.Message}");
}
catch (Exception ex) {
    Console.WriteLine($"{ex.GetType().Name}: {ex.Message}");
}

public static class ParallelQueryExt {
    public static void ForEachAggregatingExceptions<T>(this ParallelQuery<T> pq, Action<T> processFn) {
        Exception FirstException = null;
        var e = pq.GetEnumerator();
        try {
            while (e.MoveNext())
                processFn(e.Current);
        }
        catch (Exception ex) {
            FirstException = ex;
        }
        finally {
            if (e != null) {
                try {
                    e.Dispose();
                }
                catch (AggregateException aex) { // combine exceptions from Dispose with FirstException if any
                    if (FirstException != null) {
                        throw new AggregateException(aex.InnerExceptions.Prepend(FirstException));
                    }
                    else
                        throw;
                }
                catch (Exception ex) { // combine single exception from Dispose with FirstException if any
                    throw new AggregateException(new[] { ex, FirstException });
                }
                if (FirstException != null) // re-throw FirstException if no others occurred
                    throw FirstException;
            }
        }
    }
}

PS The b variable and the if prevents the compiler from optimizing out the while loop into an if since it can figure out the throw will prevent the loop from executing more than once pass.

  • Related