2010-10-06 94 views
9

msdn文档指出静态通用队列是线程安全的。这是否意味着下面的代码是线程安全的?换句话说,当一个线程入队一个int和另一个线程同时出队一个int时有问题吗?为了线程安全,我必须锁定Enqueue和Dequeue操作吗?这是使用静态队列线程安全的吗?

class Test { 
    public static Queue<int> queue = new Queue<int>(10000); 

    Thread putIntThread; 
    Thread takeIntThread; 

    public Test() { 
     for(int i = 0; i < 5000; ++i) { 
      queue.Enqueue(0); 
     } 
     putIntThread = new Thread(this.PutInt); 
     takeIntThread = new Thread(this.TakeInt); 
     putIntThread.Start(); 
     takeIntThread.Start(); 
    } 

    void PutInt() { 
     while(true) 
     { 
      if(queue.Count < 10000) {//no need to lock here as only itself can change this condition 
       queue.Enqueue(0); 
      } 
     } 
    } 

    void TakeInt() { 
     while(true) { 
      if(queue.Count > 0) {//no need to lock here as only itself can change this condition 
       queue.Dequeue(); 
      } 
     } 
    } 

} 

编辑:我必须使用.NET 3.5

+0

我注意到你不想锁定的评论,因为它已经很慢并且锁定会让它变慢。如果这个代码已经很慢,那么相比之下,锁定的开销会很小。锁定速度通常很快,但人们认为它非常慢。我曾经看到人们在没有进行任何测量的情况下创建了精美的读写器锁。当被迫这样做时,花哨的人往往比较慢。做简单的.net锁定,然后通过配置文件找出你很慢的地方 – pm100 2010-10-06 23:01:35

回答

23

这绝对不是线程安全的。从Queue<T>的文档。

此类型的公共静态(Visual Basic中的Shared)成员是线程安全的。任何实例成员不保证是线程安全的。

A Queue<T>可以同时支持多个阅读器,只要该集合未被修改即可。即便如此,枚举集合本质上不是一个线程安全的过程。为了确保枚举期间的线程安全性,您可以在整个枚举过程中锁定集合。为了让集合可以被多个线程读取和写入,您必须实现自己的同步。

重读你的问题,你似乎对“这种类型的静态成员”这个短语感到困惑 - 它不是在谈论“静态队列”,因为没有这种东西。一个对象不是静态的 - 一个成员。当它谈到静态成员时,它正在讨论诸如Encoding.GetEncodingQueue<T>实际上没有任何静态成员)。实例成员是类似于EnqueueDequeue的东西 - 与类型实例相关的成员,而不是类型本身。

因此,无论您需要为每个操作使用锁还是使用.NET 4,都可以使用ConcurrentQueue<T>

+1

每当我读到你的答案时,我都会学到新的东西!感谢您来到SO。 :) – Nayan 2010-10-06 08:38:32

+0

谢谢。的确,我误解了“这种类型的静态成员”。 – blizpasta 2010-10-06 09:04:33

1

是的,你必须锁定就像MSDN

要允许由多个线程用于读取和写入访问的集合,则必须实现自己的同步。

2

MSDN声明的是,Queue的静态方法是线程安全的,而不是静态实例的实例方法是线程安全的。

+1

对不起,这张贴在Jon的答案的第1版和第2版之间。 – Timores 2010-10-06 08:53:22

+0

没有必要道歉,这不是问题。 – blizpasta 2010-10-06 09:04:09

+0

不是一个真正的问题,但是当我开始打字时,我的答案有一定的价值。当我完成时,Jon已经更新了他的帖子,而我自己的帖子已经不再有任何价值了.-) – Timores 2010-10-06 10:16:39

4

是的,正如这里所说的,一个静态实例的实例成员与静态成员不同,它只是保证线程安全的后者,所以你必须锁定入队和出队操作。

如果锁定被证明是一个瓶颈,队列是简单的收藏品之一在无锁的方式来写,只要一不还需要通过Queue<T>提供全面落实ICollection<T>

internal sealed class LockFreeQueue<T> 
{ 
    private sealed class Node 
    { 
    public readonly T Item; 
    public Node Next; 
    public Node(T item) 
    { 
     Item = item; 
    } 
    } 
    private volatile Node _head; 
    private volatile Node _tail; 
    public LockFreeQueue() 
    { 
    _head = _tail = new Node(default(T)); 
    } 
#pragma warning disable 420 // volatile semantics not lost as only by-ref calls are interlocked 
    public void Enqueue(T item) 
    { 
    Node newNode = new Node(item); 
    for(;;) 
    { 
     Node curTail = _tail; 
     if (Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null) //append to the tail if it is indeed the tail. 
     { 
     Interlocked.CompareExchange(ref _tail, newNode, curTail); //CAS in case we were assisted by an obstructed thread. 
     return; 
     } 
     else 
     { 
     Interlocked.CompareExchange(ref _tail, curTail.Next, curTail); //assist obstructing thread. 
     } 
    } 
    }  
    public bool TryDequeue(out T item) 
    { 
    for(;;) 
    { 
     Node curHead = _head; 
     Node curTail = _tail; 
     Node curHeadNext = curHead.Next; 
     if (curHead == curTail) 
     { 
     if (curHeadNext == null) 
     { 
      item = default(T); 
      return false; 
     } 
     else 
      Interlocked.CompareExchange(ref _tail, curHeadNext, curTail); // assist obstructing thread 
     } 
     else 
     { 
     item = curHeadNext.Item; 
     if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead) 
     { 
      return true; 
     } 
     } 
    } 
    } 
#pragma warning restore 420 
} 

该队列只有一个EnqueueTryDequeue(如果队列为空,则返回false)方法。使用联锁增量和减量来添加计数属性并不重要(确保在实际属性中计数字段是不稳定地读取的),但是除此之外,添加任何不能写为委托给一个成员已经定义,或在施工过程中发生(在这种情况下,您将只有一个线程在这一点上使用它,除非你做了一些非常奇怪的事情)。

该实现也是免等待的,就好像一个线程的动作不会阻止另一个线程进展(如果一个线程在第二个线程尝试这样做时通过入队过程的一半,则第二个线程将完成第一个线程的工作)。尽管如此,我还是等到锁定实际上证明了一个瓶颈(除非你只是在试验;玩弄异国情调,与熟悉的工作)。事实上,在许多情况下,这会比锁定Queue<T>更昂贵,尤其是因为它不太适合将内容彼此靠近保存在内存中,所以您可能会发现由于这个原因,大量的操作接连不断。只要不存在频繁的锁争用,锁定通常相当便宜。

编辑:

我现在有时间在上述工作方式上添加注释。我通过阅读别人的同一个想法的版本来写这篇文章,写这个给我自己来复制这个想法,然后和我之后阅读的版本进行比较,并发现它是一个非常丰富的练习。

让我们从非锁定的免费实现开始。这是一个单独链接的列表。

internal sealed class NotLockFreeYetQueue<T> 
{ 
    private sealed class Node 
    { 
    public readonly T Item; 
    public Node Next{get;set;} 
    public Node(T item) 
    { 
     Item = item; 
    } 
    } 
    private Node _head; 
    private Node _tail; 
    public NotLockFreeYetQueue() 
    { 
    _head = _tail = new Node(default(T)); 
    } 
    public void Enqueue(T item) 
    { 
    Node newNode = new Node(item); 
    _tail.Next = newNode; 
    _tail = newNode; 
    } 
    public bool TryDequeue(out T item) 
    { 
     if (_head == _tail) 
     { 
      item = default(T); 
      return false; 
     } 
     else 
     { 
     item = _head.Next.Item; 
     _head = _head.Next; 
     return true; 
     } 
    } 
} 

到目前为止的实施情况的一些注意事项。

ItemNext可合理地为字段或属性。因为它是一个简单的内部类,其中一个必须是readonly而另一个是“哑”读写(在getter或setter中没有逻辑),在这里实际上没有太多选择。我在这里设置了Next这是一个属性,纯粹是因为这个功能稍后不会起作用,当我们到达那里时我想谈谈。

_head_tail开始为指向一个哨兵,而不是通过null不必对空队列的特殊情况简化了操作。

因此,排队将创建一个新节点,并在成为新尾部之前将其设置为_tailNext属性。出列将检查是否为空,如果不为空,则从头节点获取值并将头设置为旧头的属性为Next的节点。

此时需要注意的另一件事是,由于新节点是根据需要创建的,而不是预先分配的数组,因此在正常使用中,它的性能会比Queue<T>低。这不会变得更好,而且我们现在要做的所有事情都会使单线程性能变差。再一次,只有在激烈的争论中,这将打败锁定的Queue<T>

让我们使锁入锁。我们将使用Interlocked.CompareExchange()。这将第一个参数与第三个参数进行比较,如果它们相等,则将第一个参数设置为第二个参数。无论如何它会返回旧值(不管它是否被覆盖)。比较和交换是作为一个原子操作完成的,所以本身就是线程安全的,但是我们需要更多的工作来使这些操作的组合也是线程安全的。

CompareExchange和其它语言的等价物有时简称CAS(用于比较并交换)。

使用它们的一种常见方式是在循环中,我们首先通过正常读取获得我们将重写的值(请记住,.NET读取的32位值,较小的值和引用类型总是原子的),以及尝试覆盖它,如果它没有改变,循环,直到我们成功了:

private sealed class Node 
{ 
    public readonly T Item; 
    public Node Next; 
    public Node(T item) 
    { 
    Item = item; 
    } 
} 
/* ... */ 
private volatile Node _tail; 
/* ... */ 
public void Enqueue(T item) 
{ 
    Node newNode = new Node(item); 
    for(;;) 
    { 
    Node curTail = _tail; 
    if(Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null) 
    { 
     _tail = newNode; 
     return; 
    } 
    } 
} 

我们希望添加到尾部的Next只有当它的null - 如果不是因为写它的另一个线程。所以,我们做一个只有在这种情况下才能成功的CAS。如果是,我们设置_tail为新节点,否则我们再试一次。

下,就必须改变是一个字段这个工作,我们不能与性能做到这一点。我们还制作_tailvolatile,以便_tail在所有CPU高速缓存中都是新鲜的(CompareExchange具有易失性语义,因此不会因缺乏波动性而被破坏,但它可能会比所需的频率更频繁地运行, _tail也)。

这是无锁的,但没有等待。如果一个线程和CAS一样远,但还没有写入_tail,并且一段时间没有任何CPU时间,那么所有其他尝试排队的线程都将保持循环,直到它被调度并设法执行为止。如果线程长时间被中止或暂停,这会导致一种永久活锁。

所以,如果我们在中科院已经失败的情况下,我们是在这样的情况。我们可以通过执行其他线程的工作是解决这一问题:

for(;;) 
    { 
    Node curTail = _tail; 
    if(Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null) 
    { 
     Interlocked.CompareExchange(ref _tail, newNode, curTail); //CAS in case we were assisted by an obstructed thread. 

     return; 
    } 
    else 
    { 
     Interlocked.CompareExchange(ref _tail, curTail.Next, curTail); //assist obstructing thread. 
    } 
    } 

现在,在大多数情况下写信给curTail.Next线程将新节点分配给_tail - 而是通过CAS的情况下,它已经完成。但是,另一个线程无法写入curtail.Next,它可以尝试将curTail.Next指定为_tail,以完成第一个线程的工作并开始自己的工作。

因此,一个无锁,免等待入队。有时间去离职。首先让我们考虑一下我们不怀疑排队空的情况。就像排队一样,我们将首先获取我们感兴趣的节点的本地副本; _head_tail_head.Next(再次不使用空头部或尾部空队列使生活更轻松,这意味着它是安全的阅读在任何状态下_head.Next)。也像入队,我们将取决于波动,这时候不只是_tail,但_head,所以我们将其更改为:

private volatile Node _head; 

我们改变TryDequeue到:

public bool TryDequeue(out T item) 
    { 
     Node curHead = _head; 
     Node curTail = _tail; 
     Node curHeadNext = curHead.Next; 
     if (_head == _tail) 
     { 
      item = default(T); 
      return false; 
     } 
     else 
     { 
     item = curHeadNext.Item; 
     if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead) 
      return true; 
     } 
    } 

空 - 现场案件现在是不正确的,但我们会回到那个。它是安全的项目设置为curHeadNext.Item因为如果我们没有完成,我们将再次覆盖它的操作,但我们必须使操作写入_head原子,并保证只要_head没有改变的情况发生。如果没有,那么_head已被另一个线程更新,我们可以再次循环(无需为该线程工作,它已经完成了所有会影响我们的任务)。

现在考虑如果_head == _tail会发生什么情况。可能它是空的,但可能_tail.Next(这将与curHeadNext相同)是由入队写入的。在这种情况下,我们更可能想要的不是空虚的结果,而是我们将部分排队的项目出队的结果。所以,我们协助该线程,并再次继续循环:

if (curHead == curTail) 
{ 
    if (curHeadNext == null) 
    { 
     item = default(T); 
     return false; 
    } 
    else 
     Interlocked.CompareExchange(ref _tail, curHeadNext, curTail); 
} 

最后,唯一剩下的问题是,我们不断收到420个警告,因为我们传递挥发性领域byref方法。这往往会阻止易失性语义(因此警告),但不会与CompareExchange(因此我们这样做)。我们可以禁用警告,包括解释我们为什么这么做的评论(我尝试不要在没有正当评论的情况下禁用警告),并且我们已经提供了前面给出的代码。

请注意,我们在GC支持框架中这样做很重要。如果我们不得不处理重新分配,它将变得更加复杂。

+0

+1:谢谢!你似乎能够阅读我的想法,从我的问题推断出,如果可能的话,我想避免锁定。我目前的代码没有锁,我想确认我对msdn文档的(错误)解释。它已经太慢了,添加锁会让它更慢。如果我无法提高速度,我可能会最终尝试您的建议,尽管我必须承认我还不了解您的代码(这是我的工作内容)。 – blizpasta 2010-10-06 15:52:20

+0

我很乐意解释上面的代码,虽然也许它与原始主题相距甚远,并且在其他地方最好。我会非常**怀疑这会有所帮助。锁并不昂贵;有争议的锁可能是昂贵的(有时是非常残酷的),但没有争议的锁不会成为瓶颈(当然,当它们被争议时恰恰是你应该感到高兴的时候)。同时,由于'Queue '的工作方式,从CPU频繁访问它可能比我在这里得到的代码更快,因为... – 2010-10-06 15:58:11

+0

...项目在内存中彼此更接近,因此更可能在CPU缓存中。另外一个简单的读写操作(在队列中使用)比我课上使用的互锁操作要快。在无争议的情况下,我希望'队列'击倒我的课,即使增加了锁。所以,虽然上述*可能会有用,但我只会认为,如果高锁定争用被证明是一个真正的问题(或为了尝试新的做事方式的乐趣,这就是为什么我有上述的手[基于在阅读其他代码时,不是100%原创]) – 2010-10-06 16:03:33