原则36:理解 I/O 受限制(Bound)操作 PLINQ 的使用

By D.S.Qiu

尊重他人的劳动,支持原创,转载请注明出处:http://dsqiu.iteye.com

Parallel Task 库看起来是 CPU 受限制操作进行优化。虽然这是类库的一个核心任务,但是它在 I/O 受限制操作也做的很好。实际上, Parallel Task 库的处理 I/O 受限制操作的设计更多是默认实现的。它会根据你线程的繁忙程度更新分配线程的数量。更多阻塞的线程(等待 I/O 操作)会导致线程分配更多线程完成手头的任务。

和其他并行扩展一样,你可以使用方法调用,或者 LINQ 查询语法进入并行执行模型。I/O 受限制操作和 CPU 受限制操作的并行执行表现会有些不同。你经常需要比核心更多的线程,因为 I/O 受限制操作花费更多时间等待它们的外部事件。PLINQ 也为这个习惯提供了框架。

下面这段代码是从一系列网站下载数据:

foreach (var url in urls) 
{
    var result = new WebClient().DownloadData(url); 
    UseResult(result);
}

DownloadData() 的调用发起了 web 的同步请求而且知道所有数据被检索下来。这个算法花费太多时间等待。你可以使用并行 for 循环改变为并行模型:

Parallel.ForEach(urls, url => 
{
    var result = new WebClient().DownloadData(url); 
    UseResult(result);
});

Parallel.ForEach() 会选择进入并行处理模型。这个版本花的时间比顺序进行版本少多了。实际上,在我的双核机器上,加速和 url 的集合的元素个数成正比。线程花更多时间在等待,所以 Parallel Task 库会创建更多线程。

你可以使用 PLINQ 和查询语法产生一样的结果:

var results = from url in urls.AsParallel() 
    select new WebClient().DownloadData(url);
results.ForAll(result => UseResult(result));

PLINQ 和 Parallel Task 库支持的 Parallel.ForEach() 会有一点不同。PLINQ 使用固定的线程数量,然而 AsParallel() 会更加吞吐量的增加和减少活动线程的数量。你可以使用 ParallelEnumeralbe.WithDegreeOfParallelism() 控制线程的数量(查看原则35)。但是 Parallel.ForEach() 会为你管理。 Parallel.ForEach() 当 I/O 受限制和 CPU 受限制操作混合时表现的更好。 Parrallel.ForEach() 会基于当前加载管理活动线程的数量。当更多线程阻塞等待 I/O 操作,它就会创建更多的线程以增加吞吐量。当工作线程增多,它就会让活动的线程数量减少以最小化上下文的切换。

上面演示的代码不是真正的异步。它只是充分利用多线程来并行执行些任务。但是围绕这个程序它会等待所有 web 请求完成才继续做其他工作。 Parallel Task 库提供其他基本的异步模式的实现。其中常见的模式之一就是启动多个 I/O 受限制任务并且知道这些结果返回才执行一些操作。完美地,我更喜欢像这样写:

urls.RunAsync( 
    url => startDownload(url), task => finishDownload(task.AsyncState.ToString(), task.Result));

这里使用 startDownload() 方法开始每个 URL 的下载。随着每个下载完成,finsishDownload() 会被执行。一旦所有下载完成, RunAsync() 就完成。 Parallel Task 库完成这个自然做了很多工作,所以我们仔细检查下。最好开始的地方是 RunAsync 方法:

public static void RunAsync<T, TResult>( this IEnumerable<T> taskParms, Func<T, Task<TResult>> taskStarter, Action<Task<TResult>> taskFinisher)
{
    taskParms.Select(parm => taskStarter(parm)).AsParallel().ForAll(t => t.ContinueWith(t2 => taskFinisher(t2)));
}

这个方法为每个输入参数创建一个任务。Select() 方法返回任务序列。下一步,你使用 AsParallel() 并行处理结果。对于每个单一的任务,你会调用后续处理方法。 Task<T> 类表示一个任务,并包含这个任务输入和输出值的属性。 ContinueWith() 是 Task<T> 的其中一个方法。它会在任务完成后调用,允许你对已完成的任务进行处理。在 RunAsync 方法中,它传入 Task 对象参数调用 taskFinisher 。ForAll() 使用 Inverted Enumeration 算法,它会阻塞知道所有任务完成。

我们深入探讨这个模式,理解开始下载方法和汇报下载完成的方法。 finishDownload 方法很简单,我在完成时输出:

private static void finishDownload(string url, byte[] bytes) 
{
    Console.WriteLine("Read {0} bytes from {1}", bytes.Length, url);
}

StartDownload 比 Parallel Task 库的接口复杂一些。具体的类型用来帮助支持 Task 接口。我更想抽象出来,但是针对具体的任务的处理不同类型会有点区别。实际上, Parallel Task 库为 .NET BCL 在此版本之前的很多不同异步模式之上提出了一套通用的接口。

private static Task<byte[]> startDownload(string url) 
{
    var tcs = new TaskCompletionSource<byte[]>(url); 
    var wc = new WebClient(); 
    wc.DownloadDataCompleted += (sender, e) => 
    {
        if (e.UserState == tcs) 
        {
            if (e.Cancelled) 
                tcs.TrySetCanceled();
            else if (e.Error != null) 
                tcs.TrySetException(e.Error);
            else 
                tcs.TrySetResult(e.Result);
        } 
    }; 
    wc.DownloadDataAsync(new Uri(url), tcs); 
    return tcs.Task;
}

这个方法混合了 Task 代码和从 URL 下载的代码,所以我们必须非常认真地梳理一遍。首先,它为这个任务创建一个 TaskCompletionSource 对象。TaskCompeletionSource 对象使得任务的创建和完成分离了。这里是非常重要的,因为你使用 WebClient 类的异步方法创建这个任务。TaskCompletionSource 的参数是这个任务的返回的结果。

WebClient 类使用基于事件的异步模式(EAP)。这意味着你像一个事件注册处理器,当这个异步操作完成时,事件就会被触发。当事件触发 startDownload() 吧任务完成信息存储在 TaskCompletionSource 中。TaskSheduler 选择一个任务并开始下载。这个方法返回的 Task 对象内嵌在 TaskCompletionSource 中,这样当任务完成时事件结果就会被处理。

在这些工作后,web 下载另一个线程异步开始。当下载完成, DownloadDataCompleted 事件就会被触发。事件处理会设置 TaskCompletionSource 的完成状态。在 TaskCompletionSouce 嵌入 Task 对象表示它已经完成。

现在,任务会调用 ContinueWith() ,报告下载的结果。花了点功夫解开这些细节,但是在接解开过一次后,这个模式就不会那么难理解。

上面展示的例子就是底层使用的基于事件异步模式的正确表达习惯。 .NET 库的其他领域使用异步编程模型( APM) 模式。在这个模式,一些操作 Foo 你调用 BeginFoo() ,会返回一个 IAsyncResult 对象。一旦操作完成,你可以调用 EndFoo() ,传入参数就是这个 IAsyncResult 对象。Parallel Task 库你可以使用 Task<TResult>.Factory.FromAsync() 方法实现这个模式。

底层的原理和我下载 web 数据的版本是类似的。区别在于你提供了不同的委托去匹配使用的异步方法来创建任务。

Parallel Task 库提供一系列方法,使得 I/O 受限制操作和 CPU 受限制一样工作。使用 Task 类,你可以对 I/O 受限制操作或混合了 I/O 和 CPU 受限制操作支持各种异步模式。并行任务还是不那么简单,但是 Parallel Task 库和 PLINQ 比之前的库提供更好的语言层次对异步编程的支持。随着我们程序会更多访问不同机器的数据和更多线程等待远程机器的响应,这会变得更重要。

小结:

一般的多线程都是指的是 CPU 受限制的操作进行并行优化,对于 I/O 也同样存在现在,要等到外部事件响应,PLINQ 和 Parallel Task 库对 I/O 受现在操作也像 CPU 受限制一样提供支持。

欢迎各种不爽,各种喷,写这个纯属个人爱好,秉持”分享“之德!

有关本书的其他章节翻译请点击查看,转载请注明出处,尊重原创!

如果您对D.S.Qiu有任何建议或意见可以在文章后面评论,或者发邮件([email protected])交流,您的鼓励和支持是我前进的动力,希望能有更多更好的分享。

转载请在文首注明出处:http://dsqiu.iteye.com/blog/2088750

更多精彩请关注D.S.Qiu的博客和微博(ID:静水逐风)