目录

 

七、并行编程与PLINQ

要使用多线程开发,必须非常熟悉Thread的使用,而且在开发过程中可能会面对很多未知的问题。为了简化开发,.NET 4.0 特别提供一个并行编程库System.Threading.Tasks,它可以简化并行开发,你无需直接跟线程或线程池打交道,就可以简单建立多线程应用 程序。此外,.NET还提供了新的一组扩展方法PLINQ,它具有自动分析查询功能,如果并行查询能提高系统效率,则同时运行,如果查询未能从并行查询中 受益,则按原顺序查询。下面将详细介绍并行操作的方式。

 

7.1 泛型委托

使用并行编程可以同时操作多个委托,在介绍并行编程前先简单介绍一下两个泛型委托System.Func<>与System.Action<>。

Func<>是一个能接受多个参数和一个返回值的泛型委托,它能接受0个到4个输入参数, 其中 T1,T2,T3,T4 代表自定的输入类型,TResult为自定义的返回值。

public delegate TResult Func<TResult>()
public delegate TResult Func<T1,TResult>(T1 arg1)
public delegate TResult Func<T1,T2, TResult>(T1 arg1,T2 arg2)
public delegate TResult Func<T1,T2, T3, TResult>(T1 arg1,T2 arg2,T3 arg3)
public delegate TResult Func<T1,T2, T3, ,T4, TResult>(T1 arg1,T2 arg2,T3 arg3,T4 arg4)

Action<>与Func<>十分相似,不同在于Action<>的返回值为void,Action能接受1~16个参数

public delegate void Action<T1>()
public delegate void Action<T1,T2>(T1 arg1,T2 arg2)
public delegate void Action<T1,T2, T3>(T1 arg1,T2 arg2, T3 arg3)
.............
public delegate void Action<T1,T2, T3, ,T4, ...... ,T16>(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)

 

7.2 任务并行库(TPL)

System.Threading.Tasks中的类被统称为任务并行库(Task Parallel Library,TPL),TPL使用CLR线程池把工作分配到CPU,并能自动处理工作分区、线程调度、取消支持、状态管理以及其他低级别的细节操作,极大地简化了多线程的开发。

注意:TPL比Thread更具智能性,当它判断任务集并没有从并行运行中受益,就会选择按顺序运行。但并非所有的项目都适合使用并行开发,创建过多并行任务可能会损害程序的性能,降低运行效率。

TPL包括常用的数据并行与任务并行两种执行方式:

7.2.1 数据并行

数据并行的核心类就是System.Threading.Tasks.Parallel,它包含两个静态方法 Parallel.For 与 Parallel.ForEach, 使用方式与for、foreach相仿。通过这两个方法可以并行处理System.Func<>、 System.Action<>委托。

以下一个例子就是利用 public static ParallelLoopResult For( int from, int max, Action<int>) 方法对List<Person>进行并行查询。

假设使用单线程方式查询3个Person对象,需要用时大约6秒,在使用并行方式,只需使用2秒就能完成查询,而且能够避开Thread的繁琐处理。

1     class Program 2     { 3         static void Main(string[] args) 4         { 5             //设置最大线程数  6             ThreadPool.SetMaxThreads(1000, 1000); 7             //并行查询  8             Parallel.For(0, 3,n => 9                 {10                     Thread.Sleep(2000);  //模拟查询11                     ThreadPoolMessage(GetPersonList()[n]);12                 });13             Console.ReadKey();14         }15 16         //模拟源数据 17         static IList
GetPersonList()18 {19 var personList = new List
();20 21 var person1 = new Person();22 person1.ID = 1;23 person1.Name = "Leslie";24 person1.Age = 30;25 personList.Add(person1);26 ...........27 return personList;28 }29 30 //显示线程池现状 31 static void ThreadPoolMessage(Person person)32 {33 int a, b;34 ThreadPool.GetAvailableThreads(out a, out b);35 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +36 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +37 " CompletionPortThreads is :{5}\n",38 person.ID, person.Name, person.Age,39 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());40 41 Console.WriteLine(message);42 }43 }

观察运行结果,对象并非按照原排列顺序进行查询,而是使用并行方式查询。

 

若想停止操作,可以利用ParallelLoopState参数,下面以ForEach作为例子。

public static ParallelLoopResult ForEach<TSource>( IEnumerable<TSource> source, Action<TSource, ParallelLoopState> action)
其中source为数据 集,在Action<TSource,ParallelLoopState>委托的ParallelLoopState参数当中包含有 Break()和 Stop()两个方法都可以使迭代停止。Break的使用跟传统for里面的使用方式相似,但因为处于并行处理当中,使用Break并不能保证所有运行能 立即停止,在当前迭代之前的迭代会继续执行。若想立即停止操作,可以使用Stop方法,它能保证立即终止所有的操作,无论它们是处于当前迭代的之前还是之 后。

class Program     {          static void Main(string[] args)          {              //设置最大线程数               ThreadPool.SetMaxThreads(1000, 1000);                //并行查询               Parallel.ForEach(GetPersonList(), (person, state) =>                  {                      if (person.ID == 2)                          state.Stop();                      ThreadPoolMessage(person);                  });              Console.ReadKey();          }            //模拟源数据           static IList
GetPersonList() { var personList = new List
(); var person1 = new Person(); person1.ID = 1; person1.Name = "Leslie"; person1.Age = 30; personList.Add(person1); .......... return personList; } //显示线程池现状 static void ThreadPoolMessage(Person person) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + " CurrentThreadId is {3}\n WorkerThreads is:{4}" + " CompletionPortThreads is :{5}\n", person.ID, person.Name, person.Age, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }

观察运行结果,当Person的ID等于2时,运行将会停止。

 

当要在多个线程中调用本地变量,可以使用以下方法:

public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<Of TSource>, Func<Of TLocal>, Func<Of TSource,ParallelLoopState,TLocal,TLocal>, Action<Of TLocal>)
其中第一个参数为数据集;
第二个参数是一个Func委托,用于在每个线程执行前进行初始化;
第 三个参数是委托Func<Of T1,T2,T3,TResult>,它能对数据集的每个成员进行迭代,当中T1是数据集的成员,T2是一个ParallelLoopState对 象,它可以控制迭代的状态,T3是线程中的本地变量;
第四个参数是一个Action委托,用于对每个线程的最终状态进行最终操作。

在以下例子中,使用ForEach计算多个Order的总体价格。在ForEach方法中,首先把参数初始化为0f,然后用把同一个Order的多 个OrderItem价格进行累加,计算出Order的价格,最后把多个Order的价格进行累加,计算出多个Order的总体价格。

1     public class Order 2     { 3         public int ID; 4         public float Price; 5     } 6  7     public class OrderItem 8     { 9         public int ID;10         public string Goods;11         public int OrderID;12         public float Price;13         public int Count;14     }15 16     class Program17     {18         static void Main(string[] args)19         {20             //设置最大线程数 21             ThreadPool.SetMaxThreads(1000, 1000);22             float totalPrice = 0f;23             //并行查询 24             var parallelResult = Parallel.ForEach(GetOrderList(),25                      () => 0f,   //把参数初始值设为0 26                      (order, state, orderPrice) =>27                      {28                          //计算单个Order的价格 29                          orderPrice = GetOrderItem().Where(item => item.OrderID == order.ID)30                               .Sum(item => item.Price * item.Count);31                          order.Price = orderPrice;32                          ThreadPoolMessage(order);33                          34                          return orderPrice;35                      },36                     (finallyPrice) =>37                     {38                         totalPrice += finallyPrice;//计算多个Order的总体价格 39                     }40                 );41             42             while (!parallelResult.IsCompleted)43                 Console.WriteLine("Doing Work!");44 45             Console.WriteLine("Total Price is:" + totalPrice);46             Console.ReadKey();47         }48         //虚拟数据49         static IList
GetOrderList()50 {51 IList
orderList = new List
();52 Order order1 = new Order();53 order1.ID = 1;54 orderList.Add(order1);55 ............56 return orderList;57 }58 //虚拟数据59 static IList
GetOrderItem()60 {61 IList
itemList = new List
();62 63 OrderItem orderItem1 = new OrderItem();64 orderItem1.ID = 1;65 orderItem1.Goods = "iPhone 4S";66 orderItem1.Price = 6700;67 orderItem1.Count = 2;68 orderItem1.OrderID = 1;69 itemList.Add(orderItem1);70 ...........71 return itemList;72 }73 74 //显示线程池现状 75 static void ThreadPoolMessage(Order order)76 {77 int a, b;78 ThreadPool.GetAvailableThreads(out a, out b);79 string message = string.Format("OrderID:{0} OrderPrice:{1}\n" +80 " CurrentThreadId is {2}\n WorkerThreads is:{3}" +81 " CompletionPortThreads is:{4}\n",82 order.ID, order.Price,83 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());84 85 Console.WriteLine(message);86 }87 }

运行结果

 

 7.2.2 任务并行

在TPL当中还可以使用Parallel.Invoke方法触发多个异步任务,其中 actions 中可以包含多个方法或者委托,parallelOptions用于配置Parallel类的操作。

public static void Invoke(Action[] actions )
public static void Invoke(ParallelOptions parallelOptions, Action[] actions )
下面例子中利用了Parallet.Invoke并行查询多个Person,actions当中可以绑定方法、lambda表达式或者委托,注意绑定方法时必须是返回值为void的无参数方法。

class Program     {         static void Main(string[] args)         {             //设置最大线程数              ThreadPool.SetMaxThreads(1000, 1000);                          //任务并行              Parallel.Invoke(option,                 PersonMessage,                  ()=>ThreadPoolMessage(GetPersonList()[1]),                   delegate(){                     ThreadPoolMessage(GetPersonList()[2]);                 });             Console.ReadKey();         }          static void PersonMessage()         {             ThreadPoolMessage(GetPersonList()[0]);         }          //显示线程池现状          static void ThreadPoolMessage(Person person)         {             int a, b;             ThreadPool.GetAvailableThreads(out a, out b);             string message = string.Format("Person  ID:{0} Name:{1} Age:{2}\n" +                   "  CurrentThreadId is {3}\n  WorkerThreads is:{4}" +                   "  CompletionPortThreads is :{5}\n",                   person.ID, person.Name, person.Age,                   Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());              Console.WriteLine(message);         }          //模拟源数据          static IList
GetPersonList() { var personList = new List
(); var person1 = new Person(); person1.ID = 1; person1.Name = "Leslie"; person1.Age = 30; personList.Add(person1); .......... return personList; } }

运行结果

 

7.3 Task简介

以Thread创建的线程被默认为前台线程,当然你可以把线程IsBackground属性设置为true,但TPL为此提供了一个更简单的类Task。

Task存在于System.Threading.Tasks命名空间当中,它可以作为异步委托的简单替代品。
通过Task的Factory属性将返回TaskFactory类,以TaskFactory.StartNew(Action)方法可以创建一个新线程,所创建的线程默认为后台线程。

1     class Program 2     { 3         static void Main(string[] args) 4         { 5             ThreadPool.SetMaxThreads(1000, 1000); 6             Task.Factory.StartNew(() => ThreadPoolMessage()); 7             Console.ReadKey(); 8         } 9 10         //显示线程池现状 11         static void ThreadPoolMessage()12         {13             int a, b;14             ThreadPool.GetAvailableThreads(out a, out b);15             string message = string.Format("CurrentThreadId is:{0}\n" +16                 "CurrentThread IsBackground:{1}\n" +17                 "WorkerThreads is:{2}\nCompletionPortThreads is:{3}\n",18                  Thread.CurrentThread.ManagedThreadId,19                  Thread.CurrentThread.IsBackground.ToString(),20                  a.ToString(), b.ToString());21             Console.WriteLine(message);22         }23     }

运行结果

 

 

若要取消处理,可以利用CancellationTakenSource对象,在TaskFactory中包含有方法

public Task StartNew( Action action, CancellationToken cancellationToken )
在 方法中加入CancellationTakenSource对象的CancellationToken属性,可以控制任务的运行,调用 CancellationTakenSource.Cancel时任务就会自动停止。下面以图片下载为例子介绍一下TaskFactory的使用。

服务器端页面

                  

首先在服务器页面中显示多个*.jpg图片,每个图片都有对应的CheckBox检测其选择情况。

所选择图片的路径会记录在Application["Url"]当中传递到Handler.ashx当中。

注意:Application是一个全局变量,此处只是为了显示Task的使用方式,在ASP.NET开发应该慎用Application。

Handler.ashx 处理图片的下载,它从 Application["Url"] 当中获取所选择图片的路径,并把图片转化成byte[]二进制数据。

再把图片的数量,每副图片的二进制数据的长度记录在OutputStream的头部。
最后把图片的二进制数据记入 OutputStream 一并输出。

1 public class Handler : IHttpHandler  2 { 3     public void ProcessRequest(HttpContext context) 4     { 5         //获取图片名,把图片数量写OutputStream  6         List
urlList = (List
)context.Application["Url"]; 7 context.Response.OutputStream.Write(BitConverter.GetBytes(urlList.Count), 0, 4); 8 9 //把图片转换成二进制数据 10 List
imageList = GetImages(urlList);11 12 //把每副图片长度写入OutputStream 13 foreach (string image in imageList)14 {15 byte[] imageByte=Convert.FromBase64String(image);16 context.Response.OutputStream.Write(BitConverter.GetBytes(imageByte.Length),0,4);17 }18 19 //把图片写入OutputStream 20 foreach (string image in imageList)21 {22 byte[] imageByte = Convert.FromBase64String(image);23 context.Response.OutputStream.Write(imageByte,0,imageByte.Length);24 }25 }26 27 //获取多个图片的二进制数据 28 private List
GetImages(List
urlList)29 {30 List
imageList = new List
();31 foreach (string url in urlList)32 imageList.Add(GetImage(url));33 return imageList;34 }35 36 //获取单副图片的二进制数据 37 private string GetImage(string url)38 {39 string path = "E:/My Projects/Example/WebSite/Images/"+url;40 FileStream stream = new FileStream(path, FileMode.Open, FileAccess.Read);41 byte[] imgBytes = new byte[10240];42 int imgLength = stream.Read(imgBytes, 0, 10240); 43 return Convert.ToBase64String(imgBytes,0,imgLength);44 }45 46 public bool IsReusable47 {48 get{ return false;}49 }50 }

 

客户端

建立一个WinForm窗口,里面加入一个WebBrowser连接到服务器端的Default.aspx页面。

当按下Download按键时,系统就会利用TaskFactory.StartNew的方法建立异步线程,使用WebRequest方法向Handler.ashx发送请求。
接收到回传流时,就会根据头文件的内容判断图片的数量与每副图片的长度,把二进制数据转化为*.jpg文件保存。

系统利用TaskFactory.StartNew(action,cancellationToken) 方式异步调用GetImages方法进行图片下载。 

当 用户按下Cancel按钮时,异步任务就会停止。值得注意的是,在图片下载时调用了 CancellationToken.ThrowIfCancellationRequested方法,目的在检查并行任务的运行情况,在并行任务被停止 时释放出OperationCanceledException异常,确保用户按下Cancel按钮时,停止所有并行任务。

public partial class Form1 : Form     {         private CancellationTokenSource tokenSource = new CancellationTokenSource();                  public Form1()         {             InitializeComponent();             ThreadPool.SetMaxThreads(1000, 1000);         }          private void downloadToolStripMenuItem_Click(object sender, EventArgs e)         {              Task.Factory.StartNew(GetImages,tokenSource.Token);         }          private void cancelToolStripMenuItem_Click(object sender, EventArgs e)         {             tokenSource.Cancel();         }          private void GetImages()         {             //发送请求,获取输出流              WebRequest webRequest = HttpWebRequest.Create("Http://localhost:5800/Handler.ashx");              Stream responseStream=webRequest.GetResponse().GetResponseStream();              byte[] responseByte = new byte[81960];             IAsyncResult result=responseStream.BeginRead(responseByte,0,81960,null,null);             int responseLength = responseStream.EndRead(result);              //获取图片数量              int imageCount = BitConverter.ToInt32(responseByte, 0);                          //获取每副图片的长度              int[] lengths = new int[imageCount];             for (int n = 0; n < imageCount; n++)             {                 int length = BitConverter.ToInt32(responseByte, (n + 1) * 4);                 lengths[n] = length;             }             try             {                 //保存图片                  for (int n = 0; n < imageCount; n++)                 {                     string path = string.Format("E:/My Projects/Example/Test/Images/pic{0}.jpg", n);                     FileStream file = new FileStream(path, FileMode.Create, FileAccess.ReadWrite);                      //计算字节偏移量                      int offset = (imageCount + 1) * 4;                     for (int a = 0; a < n; a++)                         offset += lengths[a];                      file.Write(responseByte, offset, lengths[n]);                     file.Flush();                      //模拟操作                      Thread.Sleep(1000);                      //检测CancellationToken变化                      tokenSource.Token.ThrowIfCancellationRequested();                 }             }             catch (OperationCanceledException ex)             {                 MessageBox.Show("Download cancel!");             }         }     }

 

7.4 并行查询(PLINQ)

并行 LINQ (PLINQ) 是 LINQ 模式的并行实现,主要区别在于 PLINQ 尝试充分利用系统中的所有处理器。 它利用所有处理器的方法,把数据源分成片段,然后在多个处理器上对单独工作线程上的每个片段并行执行查询, 在许多情况下,并行执行意味着查询运行速度显著提高。但这并不说明所有PLINQ都会使用并行方式,当系统测试要并行查询会对系统性能造成损害时,那将自动化地使用同步执行。

在System.Linq.ParallelEnumerable类中,包含了并行查询的大部分方法。
 

方法成员 

说明

AsParallel

PLINQ 的入口点。 指定如果可能,应并行化查询的其余部分。

AsSequential(Of TSource)

指定查询的其余部分应像非并行 LINQ 查询一样按顺序运行。

AsOrdered

指定 PLINQ 应保留查询的其余部分的源序列排序,直到例如通过使用 orderby(在 Visual Basic 中为 Order By)子句更改排序为止。

AsUnordered(Of TSource)

指定查询的其余部分的 PLINQ 不需要保留源序列的排序。

WithCancellation(Of TSource)

指定 PLINQ 应定期监视请求取消时提供的取消标记和取消执行的状态。

WithDegreeOfParallelism(Of TSource)

指定 PLINQ 应当用来并行化查询的处理器的最大数目。

WithMergeOptions(Of TSource)

提供有关 PLINQ 应当如何(如果可能)将并行结果合并回到使用线程上的一个序列的提示。

WithExecutionMode(Of TSource)

指定 PLINQ 应当如何并行化查询(即使默认行为是按顺序运行查询)。

ForAll(Of TSource)

多线程枚举方法,与循环访问查询结果不同,它允许在不首先合并回到使用者线程的情况下并行处理结果。

Aggregate 重载

对于 PLINQ 唯一的重载,它启用对线程本地分区的中间聚合以及一个用于合并所有分区结果的最终聚合函数。

7.4.1 AsParallel

通常想要实现并行查询,只需向数据源添加 AsParallel 查询操作即可。

1     class Program 2     { 3         static void Main(string[] args) 4         { 5             var personList=GetPersonList().AsParallel()  6                    .Where(x=>x.Age>30); 7             Console.ReadKey(); 8         } 9 10         //模拟源数据 11         static IList
GetPersonList()12 {13 var personList = new List
();14 15 var person1 = new Person();16 person1.ID = 1;17 person1.Name = "Leslie";18 person1.Age = 30;19 personList.Add(person1);20 ...........21 return personList;22 }23 }

 

7.4.2 AsOrdered

若要使查询结果必须保留源序列排序方式,可以使用AsOrdered方法。 

AsOrdered依然使用并行方式,只是在查询过程加入额外信息,在并行结束后把查询结果再次进行排列。

class Program     {         static void Main(string[] args)         {             var personList=GetPersonList().AsParallel().AsOrdered()                 .Where(x=>x.Age<30);             Console.ReadKey();         }          static IList
GetPersonList() {......} }

7.4.3 WithDegreeOfParallelism

默认情况下,PLINQ 使用主机上的所有处理器,这些处理器的数量最多可达 64 个。

通过使用 WithDegreeOfParallelism(Of TSource) 方法,可以指示 PLINQ 使用不多于指定数量的处理器。

1     class Program 2     { 3         static void Main(string[] args) 4         { 5             var personList=GetPersonList().AsParallel().WithDegreeOfParallelism(2) 6                 .Where(x=>x.Age<30); 7             Console.ReadKey(); 8         } 9 10         static IList
GetPersonList()11 {.........}12 }

 

7.4.4 ForAll

如果要对并行查询结果进行操作,一般会在for或foreach中执行,执行枚举操作时会使用同步方式。

有见及此,PLINQ中包含了ForAll方法,它可以使用并行方式对数据集进行操作。

class Program     {         static void Main(string[] args)         {             ThreadPool.SetMaxThreads(1000, 1000);             GetPersonList().AsParallel().ForAll(person =>{                 ThreadPoolMessage(person);             });             Console.ReadKey();         }          static IList
GetPersonList() {.......} //显示线程池现状 static void ThreadPoolMessage(Person person) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + " CurrentThreadId is {3}\n WorkerThreads is:{4}" + " CompletionPortThreads is :{5}\n", person.ID, person.Name, person.Age, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }

运行结果

 

7.4.5 WithCancellation

如果需要停止查询,可以使用 WithCancellation(Of TSource) 运算符并提供 CancellationToken 实例作为参数。 

与 第三节Task的例子相似,如果标记上的 IsCancellationRequested 属性设置为 true,则 PLINQ 将会注意到它,并停止所有线程上的处理,然后引发 OperationCanceledException。这可以保证并行查询能够立即停止。

1     class Program 2     { 3         static CancellationTokenSource tokenSource = new CancellationTokenSource(); 4  5         static void Main(string[] args) 6         { 7             Task.Factory.StartNew(Cancel); 8             try 9             {10                 GetPersonList().AsParallel().WithCancellation(tokenSource.Token)11                     .ForAll(person =>12                     {13                         ThreadPoolMessage(person);14                     });15             }16             catch (OperationCanceledException ex)17             { }18             Console.ReadKey();19         }20 21         //在10~50毫秒内发出停止信号 22         static void Cancel()23         {24             Random random = new Random();25             Thread.Sleep(random.Next(10,50));26             tokenSource.Cancel();27         }28 29         static IList
GetPersonList()30 {......}31 32 //显示线程池现状 33 static void ThreadPoolMessage(Person person)34 {35 int a, b;36 ThreadPool.GetAvailableThreads(out a, out b);37 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +38 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +39 " CompletionPortThreads is :{5}\n",40 person.ID, person.Name, person.Age,41 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());42 Console.WriteLine(message);43 }44 }45

 

对JAVA与.NET开发有兴趣的朋友欢迎加入QQ群:162338858