原则37:构造并行算法的异常考量

By D.S.Qiu

尊重他人的劳动,支持原创,转载请注明出处:http://dsqiu.iteye.com

前面两个原则幸福地忽略了任何子线程运行出错的可能性。这显然不是现实世界所进行的。异常会在你的子线程发生,你不得不转向去收拾残局。当然,后台线程的异常在某种程度上增加复杂性。异常不能继续调用线程边界的函数栈。而是,如果在线程启动方法出现异常,这个线程就会终止。没有任何方式调用线程检索错误,或者对异常做任何事。更重要的是,如果出现异常你的并行算法就必须支持回滚,你不得不理解异常出现的副作用并且能从异常恢复过来。每个算法都有不同需求,所以在并行的环境中没有通用的答案处理异常。我只能提供的指导就是:对于特定的应用你可以使用最好的策略。

我们从上个原则的异步下载开始说。最简单策略就是没有副作用,并且从所有的 web 主机上持续下载不用考虑其中一个会失败。并行操作使用新的 AggregateException 类处理并行操作的异常。 AggregateException 的 InnerException 属性包含所有在并行操作可能产生的异常。在这个过程有好几个方式处理这个异常。首先,我会演示一个更普遍的情况,怎么在外部处理子任务产生的错误。

在上一个原则的 RunAsync() 方法在多个并行操作使用。这意味你要捕获 AggregateException 的 InnerException 集合的异常。越多并行操作,嵌套就越深。因为并行操作有不同的构成,你应该防止多次复制在异常集合的元素异常。我修改 RunAsync() 以处理可能的错误:

try 
{
    urls.RunAsync( url => startDownload(url), task => finishDownload(task.AsyncState.ToString(), task.Result));
} 
catch (AggregateException problems) 
{
    ReportAggregateError(problems); 
}
private static void ReportAggregateError(AggregateException aggregate) 
{
    foreach (var exception in aggregate.InnerExceptions) 
        if (exception is AggregateException)
            ReportAggregateError( exception as AggregateException);
        else
            Console.WriteLine(exception.Message);
}

ReportAggregateError 输出所有不是 AggregateException 自身异常的信息。当然,这会掩盖所有异常,不管你是否有没有预料到。这是相当危险的。相反,你需要处理你可以从中恢复的异常,或者重抛出其他异常。

这里有很多集合递归,所以有一个试用的函数式很有意义的。泛型方法必须知道哪些异常类你要处理,哪些异常你是不期望的并且你要处理那些你想要处理的异常。你需要为这个方法确定要处理的异常类型和处理异常的代码。这是简单的类型和 Action<T> lambda 表达式的字典。并且,如果处理没有处理 InnerException 集合的每个异常,清楚哪些异常出现异常。这说明你要重新抛出原来的异常。下面是新的 Runsync 代码:

try 
{
    urls.RunAsync(url => startDownload(url), task => finishDownload(task.AsyncState.ToString(), task.Result));
} 
catch (AggregateException problems) 
{
    var handlers = new Dictionary<Type, Action<Exception>>(); 
    handlers.Add(typeof(WebException),ex => Console.WriteLine(ex.Message));
    if (!HandleAggregateError(problems, handlers)) 
        throw;
}

HandleAggregateError 方法递归查看每个异常。如果异常是预料的,处理器会被调用。否则, HandleAggregateError 返回 false ,说明这类异常不能被正确处理:

private static bool HandleAggregateError(AggregateException aggregate, Dictionary<Type, Action<Exception>> exceptionHandlers)
{
    foreach (var exception in aggregate.InnerExceptions) 
        if (exception is AggregateException) 
            return HandleAggregateError( exception as AggregateException, exceptionHandlers);
        else if (exceptionHandlers.ContainsKey( exception.GetType()))
        {
            exceptionHandlers[exception.GetType()] (exception);
        } 
        else
            return false; 
    return true;
}

这点代码看着有些密集,但是并不难。当它传入一个 AggregateException ,它会对子列递归评估。当遍历到任何其他异常,它会查询字典。如果处理器 Action<> 已经被注册,就会调用这个处理器。如果没有,就会理解返回 false ,即发现一个不能处理的异常。

你会奇怪为什么当没有注册处理器抛出的是元素的 AggregateException 而不是单一的异常。抛出集合中的单一异常会丢失重要信息。 InnerException 可能包含很多异常。可能会有多个异常是没有预料到的。你必须返回这个集合而避免丢失太多信息。很多情况,AggregateException 的 InnerException 集合只有一个异常。然而,你不能那样写代码因为当你想要额外的信息,它却不在那。

当然,这会感觉有一点丑陋。还有没有更好的防止异常出现使得任务离开运行的后台工作的办法。在几乎所有情况,这是更好的。修改代码使得正在运行的后台任务确保没有异常能停止这个后台任务。当你使用 TaskCompletionSource<> 类,就说明你没有调用 TrySetException() ,而是确保每个任何调用 TrySetResult() 表示完成。这就有了下面对 startDownload() 的修改。但是,正如和我前面说的,你不能只是捕获每个异常。你应该只捕获可以从中恢复的异常。在这个例子中,你可以从 WebException 恢复,这个异常出现因为远程主机不可访问。其他异常类型可能表明更严重的问题。那些会持续产生的异常会终止所有处理。 startDownload 方法有了下面的修改:

private static Task<byte[]> startDownload(string url) 
{
    var tcs = new TaskCompletionSource<byte[]>(url); 
    var wc = new WebClient(); 
    wc.DownloadDataCompleted += (sender, e) => 
    {
        if (e.UserState == tcs) 
        {
            if (e.Cancelled) 
                tcs.TrySetCanceled();
            else if (e.Error != null)
            {
                if (e.Error is WebException)
                    tcs.TrySetResult(new byte[0]);
                else
                    tcs.TrySetException(e.Error); 
            } 
            else
                tcs.TrySetResult(e.Result); 
        }
    }; 
    wc.DownloadDataAsync(new Uri(url), tcs); 
    return tcs.Task;
}

WebException 的返回说明0字节数组读取,而且所有其他异常会通过正常的通道抛出。对的,也就是说当 AggregateException 被抛出仍可以继续对正在发生进行处理。很可能你只是把它们当做致命错误,而你的后台任务可以继续处理其他错误。但是你需要理解所有不同类型的异常。

当然,如果你使用 LINQ 语法,后台任务的错误有引起其他问题。记得原则35我描述了三条和并行算法的区别。在所有情况下,使用 PLINQ 和正常的懒评估会有些变化,而这些变化是你在 PLINQ 算法处理异常时必须考虑的。请记住,通常,一个查询只有在其他代码请求这个查询产生的项时才执行。这当然不是 PLINQ 的工作。后台线程运行产生结果,而且另一个任务组合最后的结果序列。它不能立即评估。查询的结果不是立即产生的。然而,后台线程只要调用运行就会开始产生结果。现在,你意味着必须改变异常处理的代码。在典型的 LINQ 查询,你可以将使用查询结果的代码放在 try/catch 块内。这不需要包裹定义 LINQ 查询表达式的代码:

var nums = from n in data 
    where n < 150 
    select Factorial(n);
try 
{
    foreach (var item in nums)
        Console.WriteLine(item); 
} 
catch (InvalidOperationException inv) 
{
// elided 
}

一旦涉及 PLINQ ,你必须在 tyr/cathc 块中封闭查询的定义。而且,当然,记住如果你使用 PLINQ ,你必须捕获 AggregateException 而不是无论你原来期望的是什么异常。不管你使用 Pipelining ,Stop&Go ,或者 Inverted Enumeration 算法都是正确的。

异常在任何算法中都是复杂的。并行任务引起更多并发症。 Parallel Task 库使用 AggregateExceptoion 类持有并行算法排除的所有异常。只要有一个后台线程抛出一个异常,其他后端操作都会被停止。你最好的计划是确保在你的并行任务执行代码时没有任何异常抛出。即使这,其他你没有期望的异常也有可能在某些地方抛出。这意味着你必须处理 AggregateException 以控制线程初始化所有后台工作。

小结:

异常,无论什么时候都要考虑进来,好吧,还没有用到 LINQ 和 PLINQ (工作没有这个需求),暂且没有深入的感受。

跑步去,加油,加油!

欢迎各种不爽,各种喷,写这个纯属个人爱好,秉持”分享“之德!

有关本书的其他章节翻译请点击查看,转载请注明出处,尊重原创!

如果您对D.S.Qiu有任何建议或意见可以在文章后面评论,或者发邮件([email protected])交流,您的鼓励和支持是我前进的动力,希望能有更多更好的分享。

转载请在文首注明出处:http://dsqiu.iteye.com/blog/2088794

更多精彩请关注D.S.Qiu的博客和微博(ID:静水逐风)