0
点赞
收藏
分享

微信扫一扫

Rx:4-[编外篇] .NET4里的Concurrent Collections


在.NET 4.0之前,是不直接支持线程安全的集合的。除非自己去做lock。而Lock带给我们的除了风险之外,系统的性能下降问题特别明显。在我之前的项目中,需要用到一个Thread-safe的Queue,我当时使用Compare-And-Swap方式实现的。但是这种方式并不能保证高效,个人能力有限,Google也没帮上多大忙。所以.net 4.0一出来的时候,宣布支持ConcurrentQueue<T>,我立刻就抄起Reflector去看看它的实现方式。

当然,除了ConcurrentQueue<T>外,还有其他几个支持线程安全的并发集合类型,看完之后,才发现他们并不全都是Lock-free的,这点还是跟我的判断不同的。

我们常说的Lock-free,对于.net来讲,其实是说就是不用lock(),也就是Monitor这个类。在Team的技术分享中我多次建议大家不要直接写lock,而是写Monitor.TryEnter(object, 200)这样的方式,这样的好处是避免整个程序面临尴尬的状况,毕竟对于一个互联网来说,绝大多数应用并不是追求完整性的,牺牲整个站点的代价太高了。在.net里,我们可以通过memory barrier或者使用CPU的CAS(就是上面说的Compare-And-Swap,那个Interlocked类就是)指令来进行一些粒度锁,这些都是很轻量级的,我们认为他是不会损坏系统性能的。

注:(感谢@​​Kavin Yang​​的提醒)

使用lock()的时候,.net会一直等待能够lock才行。而lock()的实现就是调用Monitor.Enter。但是Monitor还有一个叫做TryEnter的方法,参数除了要lock的object外,还提供一个时间参数作为该操作的超时时间。

可以参见:​​http://msdn.microsoft.com/en-us/library/42h9d380(v=VS.80).aspx​​

那看几个实现:

ConcurrentQueue<T>和ConcurrentStack<T>

这两个是使用CAS的方式来完成任务的,实现原理是一样的:

1: private void PushCore(Node<T> head, Node<T> tail)<!--CRLF-->

2: {<!--CRLF-->

3: SpinWait wait = new SpinWait();<!--CRLF-->

4: do<!--CRLF-->

5: {<!--CRLF-->

6: wait.SpinOnce();<!--CRLF-->

7: tail.m_next = this.m_head;<!--CRLF-->

8: }<!--CRLF-->

9: while (Interlocked.CompareExchange<Node<T>>(ref this.m_head, head, tail.m_next) != tail.m_next);<!--CRLF-->

10: }<!--CRLF-->

注意到do…while。整个实现都没有使用lock的地方,唯独在CAS失败的时候使用SpinWait的SpinOnce方法。
ConcurrentDictionary<TKey, TValue>
这个类也在mscorlib.dll里,同样位于System.Collections.Concurrent命名空间下。但是它的add、update等方法都是使用了lock来完成的(当然不是完全依赖lock,所以有性能提升)

1: try<!--CRLF-->

2: {<!--CRLF-->

3: if (acquireLock)<!--CRLF-->

4: {<!--CRLF-->

5: Monitor.Enter(this.m_locks[num3], ref lockTaken);<!--CRLF-->

6: }<!--CRLF-->

7: if (nodeArray != this.m_buckets)<!--CRLF-->

8: {<!--CRLF-->

9: goto Label_000D;<!--CRLF-->

10: }<!--CRLF-->

11: Node<TKey, TValue> node = null;<!--CRLF-->

12: ....<!--CRLF-->

但是对于他的读操作:

1: public bool TryGetValue(TKey key, out TValue value)<!--CRLF-->

2: {<!--CRLF-->

3: int num;<!--CRLF-->

4: int num2;<!--CRLF-->

5: if (key == null)<!--CRLF-->

6: {<!--CRLF-->

7: throw new ArgumentNullException("key");<!--CRLF-->

8: }<!--CRLF-->

9: Node<TKey, TValue>[] buckets = this.m_buckets;<!--CRLF-->

10: this.GetBucketAndLockNo(this.m_comparer.GetHashCode(key), out num, out num2, buckets.Length);<!--CRLF-->

11: Node<TKey, TValue> next = buckets[num];<!--CRLF-->

12: Thread.MemoryBarrier();<!--CRLF-->

13: while (next != null)<!--CRLF-->

14: {<!--CRLF-->

15: if (this.m_comparer.Equals(next.m_key, key))<!--CRLF-->

16: {<!--CRLF-->

17: value = next.m_value;<!--CRLF-->

18: return true;<!--CRLF-->

19: }<!--CRLF-->

20: next = next.m_next;<!--CRLF-->

21: }<!--CRLF-->

22: value = default(TValue);<!--CRLF-->

23: return false;<!--CRLF-->

24: }<!--CRLF-->

能看到这里的处理方式还是很巧妙的,Dictionary这个数据结构本身就是一个设计为读多的类型,read的次数肯定要远远超过write的次数。所以这个ConcrruentDictionary的设计并不是追求lock-free的改/写操作,而是将read操作做到了lock-free!
ConcrruntBat<T>
这个类设计的更有意思,思路很像是采用了Pool的方式(Pool一般有3种常见的使用方式),将pool对应到应用的thread上,他们之间就没有锁的我问题了(或者无损耗的锁),比如说Add操作:

1: public void Add(T item)<!--CRLF-->

2: {<!--CRLF-->

3: ThreadLocalList<T> threadList = this.GetThreadList(true);<!--CRLF-->

4: this.AddInternal(threadList, item);<!--CRLF-->

5: }<!--CRLF-->


1: private void AddInternal(ThreadLocalList<T> list, T item)<!--CRLF-->

2: {<!--CRLF-->

3: bool taken = false;<!--CRLF-->

4: try<!--CRLF-->

5: {<!--CRLF-->

6: Interlocked.Exchange(ref list.m_currentOp, 1);<!--CRLF-->

7: if ((list.Count < 2) || this.m_needSync)<!--CRLF-->

8: {<!--CRLF-->

9: list.m_currentOp = 0;<!--CRLF-->

10: Monitor2.Enter(list, ref taken);<!--CRLF-->

11: }<!--CRLF-->

12: list.Add(item, taken);<!--CRLF-->

13: }<!--CRLF-->

14: finally<!--CRLF-->

15: {<!--CRLF-->

16: list.m_currentOp = 0;<!--CRLF-->

17: if (taken)<!--CRLF-->

18: {<!--CRLF-->

19: Monitor.Exit(list);<!--CRLF-->

20: }<!--CRLF-->

21: }<!--CRLF-->

22: }<!--CRLF-->


Rx系列:
​​Rx:1-Observable​​
​​Rx:2-Observable more​​
​​Rx:3-System.CoreEx.dll​​

举报

相关推荐

0 条评论