复杂,高度封装的东西用起来很简单,但是缺失了灵活性,这篇我们就看看这些好用但灵活性不高的几个并行方法。
一:Invoke
现在电子商务的网站都少不了订单的流程,没有订单的话网站也就没有存活的价值了,往往在订单提交成功后,通常会有这两个操作,第一个:发起
信用卡扣款,第二个:发送emial确认单,这两个操作我们就可以在下单接口调用成功后,因为两个方法是互不干扰的,所以就可以用invoke来玩玩了。
1 static void Main(string[] args)
2 {
3 Parallel.Invoke(Credit, Email);
4
5 Console.Read();
6 }
7
8 static void Credit()
9 {
10 Console.WriteLine("****************** 发起信用卡扣款中 ******************");
11
12 Thread.Sleep(2000);
13
14 Console.WriteLine("扣款成功!");
15 }
16
17 static void Email()
18 {
19 Console.WriteLine("****************** 发送邮件确认单!*****************");
20
21 Thread.Sleep(3000);
22
23 Console.WriteLine("email发送成功!");
24 }
怎么样,实现起来是不是很简单,只要把你需要的方法塞给invoke就行了,不过在这个方法里面有一个重载参数需要注意下,
1 public static void Invoke(ParallelOptions parallelOptions, params Action[] actions);
有时候我们的线程可能会跑遍所有的内核,为了提高其他应用程序的稳定性,就要限制参与的内核,正好ParallelOptions提供了
MaxDegreeOfParallelism属性。
好了,下面我们大概翻翻invoke里面的代码实现,发现有几个好玩的地方:
<1>: 当invoke中的方法超过10个话,我们发现它走了一个internal可见的ParallelForReplicatingTask的FCL内部专用类,而这个类是继承自
Task的,当方法少于10个的话,才会走常规的Task.
<2> 居然发现了一个装exception 的ConcurrentQueue<Exception>队列集合,多个异常入队后,再包装成AggregateException抛出来。
比如:throw new AggregateException(exceptionQ);
<3> 我们发现,不管是超过10个还是小于10个,都是通过WaitAll来等待所有的执行,所以缺点就在这个地方,如果某一个方法执行时间太长
不能退出,那么这个方法是不是会长期挂在这里不能出来,也就导致了主流程一直挂起,然后页面就一直挂起,所以这个是一个非常危险
的行为,如果我们用task中就可以在waitall中设置一个过期时间,但invoke却没法做到,所以在使用invoke的时候要慎重考虑。
1 try
2 {
3 if (actionsCopy.Length > 10 || (parallelOptions.MaxDegreeOfParallelism != -1 && parallelOptions.MaxDegreeOfParallelism < actionsCopy.Length))
4 {
5 ConcurrentQueue<Exception> exceptionQ = null;
6 try
7 {
8 int actionIndex = 0;
9 ParallelForReplicatingTask parallelForReplicatingTask = new ParallelForReplicatingTask(parallelOptions, delegate
10 {
11 for (int l = Interlocked.Increment(ref actionIndex); l <= actionsCopy.Length; l = Interlocked.Increment(ref actionIndex))
12 {
13 try
14 {
15 actionsCopy[l - 1]();
16 }
17 catch (Exception item)
18 {
19 LazyInitializer.EnsureInitialized<ConcurrentQueue<Exception>>(ref exceptionQ, () => new ConcurrentQueue<Exception>());
20 exceptionQ.Enqueue(item);
21 }
22 if (parallelOptions.CancellationToken.IsCancellationRequested)
23 {
24 throw new OperationCanceledException(parallelOptions.CancellationToken);
25 }
26 }
27 }, TaskCreationOptions.None, InternalTaskOptions.SelfReplicating);
28 parallelForReplicatingTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler);
29 parallelForReplicatingTask.Wait();
30 }
31 catch (Exception ex2)
32 {
33 LazyInitializer.EnsureInitialized<ConcurrentQueue<Exception>>(ref exceptionQ, () => new ConcurrentQueue<Exception>());
34 AggregateException ex = ex2 as AggregateException;
35 if (ex != null)
36 {
37 using (IEnumerator<Exception> enumerator = ex.InnerExceptions.GetEnumerator())
38 {
39 while (enumerator.MoveNext())
40 {
41 Exception current = enumerator.Current;
42 exceptionQ.Enqueue(current);
43 }
44 goto IL_264;
45 }
46 }
47 exceptionQ.Enqueue(ex2);
48 IL_264:;
49 }
50 if (exceptionQ != null && exceptionQ.Count > 0)
51 {
52 Parallel.ThrowIfReducableToSingleOCE(exceptionQ, parallelOptions.CancellationToken);
53 throw new AggregateException(exceptionQ);
54 }
55 }
56 else
57 {
58 Task[] array = new Task[actionsCopy.Length];
59 if (parallelOptions.CancellationToken.IsCancellationRequested)
60 {
61 throw new OperationCanceledException(parallelOptions.CancellationToken);
62 }
63 for (int j = 0; j < array.Length; j++)
64 {
65 array[j] = Task.Factory.StartNew(actionsCopy[j], parallelOptions.CancellationToken, TaskCreationOptions.None, InternalTaskOptions.None, parallelOptions.EffectiveTaskScheduler);
66 }
67 try
68 {
69 if (array.Length <= 4)
70 {
71 Task.FastWaitAll(array);
72 }
73 else
74 {
75 Task.WaitAll(array);
76 }
77 }
78 catch (AggregateException ex3)
79 {
80 Parallel.ThrowIfReducableToSingleOCE(ex3.InnerExceptions, parallelOptions.CancellationToken);
81 throw;
82 }
83 finally
84 {
85 for (int k = 0; k < array.Length; k++)
86 {
87 if (array[k].IsCompleted)
88 {
89 array[k].Dispose();
90 }
91 }
92 }
93 }
94 }
95 finally
96 {
97 if (TplEtwProvider.Log.IsEnabled())
98 {
99 TplEtwProvider.Log.ParallelInvokeEnd((task != null) ? task.m_taskScheduler.Id : TaskScheduler.Current.Id, (task != null) ? task.Id : 0, forkJoinContextID);
100 }
101 }
二:For
下面再看看Parallel.For,我们知道普通的For是一个串行操作,如果说你的for中每条流程都需要执行一个方法,并且这些方法可以并行操作且
比较耗时,那么为何不尝试用Parallel.For呢,就比如下面的代码。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 List<Action> actions = new List<Action>() { Credit, Email };
6
7 var result = Parallel.For(0, actions.Count, (i) =>
8 {
9 actions[i]();
10 });
11
12 Console.WriteLine("执行状态:" + result.IsCompleted);
13
14 Console.Read();
15 }
16
17 static void Credit()
18 {
19 Console.WriteLine("****************** 发起信用卡扣款中 ******************");
20
21 Thread.Sleep(2000);
22
23 Console.WriteLine("扣款成功!");
24 }
25
26 static void Email()
27 {
28 Console.WriteLine("****************** 发送邮件确认单!*****************");
29
30 Thread.Sleep(3000);
31
32 Console.WriteLine("email发送成功!");
33 }
34 }
下面我们再看看Parallel.For中的最简单的重载和最复杂的重载:
1 public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body);
2
3 public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally);
4
<1> 简单的重载不必多说,很简单,我上面的例子也演示了。
<2> 最复杂的这种重载提供了一个AOP的功能,在每一个body的action执行之前会先执行localInit这个action,在body之后还会执行localFinally
这个action,有没有感觉到已经把body切成了三块?好了,下面看一个例子。
1 static void Main(string[] args)
2 {
3 var list = new List<int>() { 10, 20, 30, 40 };
4
5 var options = new ParallelOptions();
6
7 var total = 0;
8
9 var result = Parallel.For(0, list.Count, () =>
10 {
11 Console.WriteLine("------------ thead --------------");
12
13 return 1;
14 },
15 (i, loop, j) =>
16 {
17 Console.WriteLine("------------ body --------------");
18
19 Console.WriteLine("i=" + list[i] + " j=" + j);
20
21 return list[i];
22 },
23 (i) =>
24 {
25 Console.WriteLine("------------ tfoot --------------");
26
27 Interlocked.Add(ref total, i);
28
29 Console.WriteLine("total=" + total);
30 });
31
32 Console.WriteLine("iscompleted:" + result.IsCompleted);
33 Console.Read();
34 }
View Code
接下来我们再翻翻它的源代码,由于源码太多,里面神乎其神,我就找几个好玩的地方。
<1> 我在里面找到了一个rangeManager分区函数,代码复杂看不懂,貌似很强大。
1 internal RangeManager(long nFromInclusive, long nToExclusive, long nStep, int nNumExpectedWorkers)
2 {
3 this.m_nCurrentIndexRangeToAssign = 0;
4 this.m_nStep = nStep;
5 if (nNumExpectedWorkers == 1)
6 {
7 nNumExpectedWorkers = 2;
8 }
9 ulong num = (ulong)(nToExclusive - nFromInclusive);
10 ulong num2 = num / (ulong)((long)nNumExpectedWorkers);
11 num2 -= num2 % (ulong)nStep;
12 if (num2 == 0uL)
13 {
14 num2 = (ulong)nStep;
15 }
16 int num3 = (int)(num / num2);
17 if (num % num2 != 0uL)
18 {
19 num3++;
20 }
21 long num4 = (long)num2;
22 this.m_indexRanges = new IndexRange[num3];
23 long num5 = nFromInclusive;
24 for (int i = 0; i < num3; i++)
25 {
26 this.m_indexRanges[i].m_nFromInclusive = num5;
27 this.m_indexRanges[i].m_nSharedCurrentIndexOffset = null;
28 this.m_indexRanges[i].m_bRangeFinished = 0;
29 num5 += num4;
30 if (num5 < num5 - num4 || num5 > nToExclusive)
31 {
32 num5 = nToExclusive;
33 }
34 this.m_indexRanges[i].m_nToExclusive = num5;
35 }
36 }
<2> 我又找到了这个神奇的ParallelForReplicatingTask类。
那么下面问题来了,在单线程的for中,我可以continue,可以break,那么在Parallel.For中有吗?因为是并行,所以continue基本上就没有
存在价值,break的话确实有价值,这个就是委托中的ParallelLoopState做到的,并且还新增了一个Stop。
三:ForEach
其实ForEach和for在本质上是一样的,你在源代码中会发现在底层都是调用一个方法的,而ForEach会在底层中调用for共同的函数之前还会执行
其他的一些逻辑,所以这就告诉我们,能用Parallel.For的地方就不要用Parallel.ForEach,其他的都一样了,这里就不赘述了。