是的,正如这里所说的,一个静态实例的实例成员与静态成员不同,它只是保证线程安全的后者,所以你必须锁定入队和出队操作。
如果锁定被证明是一个瓶颈,队列是简单的收藏品之一在无锁的方式来写,只要一不还需要通过Queue<T>
提供全面落实ICollection<T>
:
internal sealed class LockFreeQueue<T>
{
private sealed class Node
{
public readonly T Item;
public Node Next;
public Node(T item)
{
Item = item;
}
}
private volatile Node _head;
private volatile Node _tail;
public LockFreeQueue()
{
_head = _tail = new Node(default(T));
}
#pragma warning disable 420 // volatile semantics not lost as only by-ref calls are interlocked
public void Enqueue(T item)
{
Node newNode = new Node(item);
for(;;)
{
Node curTail = _tail;
if (Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null) //append to the tail if it is indeed the tail.
{
Interlocked.CompareExchange(ref _tail, newNode, curTail); //CAS in case we were assisted by an obstructed thread.
return;
}
else
{
Interlocked.CompareExchange(ref _tail, curTail.Next, curTail); //assist obstructing thread.
}
}
}
public bool TryDequeue(out T item)
{
for(;;)
{
Node curHead = _head;
Node curTail = _tail;
Node curHeadNext = curHead.Next;
if (curHead == curTail)
{
if (curHeadNext == null)
{
item = default(T);
return false;
}
else
Interlocked.CompareExchange(ref _tail, curHeadNext, curTail); // assist obstructing thread
}
else
{
item = curHeadNext.Item;
if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead)
{
return true;
}
}
}
}
#pragma warning restore 420
}
该队列只有一个Enqueue
和TryDequeue
(如果队列为空,则返回false)方法。使用联锁增量和减量来添加计数属性并不重要(确保在实际属性中计数字段是不稳定地读取的),但是除此之外,添加任何不能写为委托给一个成员已经定义,或在施工过程中发生(在这种情况下,您将只有一个线程在这一点上使用它,除非你做了一些非常奇怪的事情)。
该实现也是免等待的,就好像一个线程的动作不会阻止另一个线程进展(如果一个线程在第二个线程尝试这样做时通过入队过程的一半,则第二个线程将完成第一个线程的工作)。尽管如此,我还是等到锁定实际上证明了一个瓶颈(除非你只是在试验;玩弄异国情调,与熟悉的工作)。事实上,在许多情况下,这会比锁定Queue<T>
更昂贵,尤其是因为它不太适合将内容彼此靠近保存在内存中,所以您可能会发现由于这个原因,大量的操作接连不断。只要不存在频繁的锁争用,锁定通常相当便宜。
编辑:
我现在有时间在上述工作方式上添加注释。我通过阅读别人的同一个想法的版本来写这篇文章,写这个给我自己来复制这个想法,然后和我之后阅读的版本进行比较,并发现它是一个非常丰富的练习。
让我们从非锁定的免费实现开始。这是一个单独链接的列表。
internal sealed class NotLockFreeYetQueue<T>
{
private sealed class Node
{
public readonly T Item;
public Node Next{get;set;}
public Node(T item)
{
Item = item;
}
}
private Node _head;
private Node _tail;
public NotLockFreeYetQueue()
{
_head = _tail = new Node(default(T));
}
public void Enqueue(T item)
{
Node newNode = new Node(item);
_tail.Next = newNode;
_tail = newNode;
}
public bool TryDequeue(out T item)
{
if (_head == _tail)
{
item = default(T);
return false;
}
else
{
item = _head.Next.Item;
_head = _head.Next;
return true;
}
}
}
到目前为止的实施情况的一些注意事项。
Item
和Next
可合理地为字段或属性。因为它是一个简单的内部类,其中一个必须是readonly
而另一个是“哑”读写(在getter或setter中没有逻辑),在这里实际上没有太多选择。我在这里设置了Next
这是一个属性,纯粹是因为这个功能稍后不会起作用,当我们到达那里时我想谈谈。
有_head
和_tail
开始为指向一个哨兵,而不是通过null
不必对空队列的特殊情况简化了操作。
因此,排队将创建一个新节点,并在成为新尾部之前将其设置为_tail
的Next
属性。出列将检查是否为空,如果不为空,则从头节点获取值并将头设置为旧头的属性为Next
的节点。
此时需要注意的另一件事是,由于新节点是根据需要创建的,而不是预先分配的数组,因此在正常使用中,它的性能会比Queue<T>
低。这不会变得更好,而且我们现在要做的所有事情都会使单线程性能变差。再一次,只有在激烈的争论中,这将打败锁定的Queue<T>
。
让我们使锁入锁。我们将使用Interlocked.CompareExchange()
。这将第一个参数与第三个参数进行比较,如果它们相等,则将第一个参数设置为第二个参数。无论如何它会返回旧值(不管它是否被覆盖)。比较和交换是作为一个原子操作完成的,所以本身就是线程安全的,但是我们需要更多的工作来使这些操作的组合也是线程安全的。
CompareExchange和其它语言的等价物有时简称CAS(用于比较并交换)。
使用它们的一种常见方式是在循环中,我们首先通过正常读取获得我们将重写的值(请记住,.NET读取的32位值,较小的值和引用类型总是原子的),以及尝试覆盖它,如果它没有改变,循环,直到我们成功了:
private sealed class Node
{
public readonly T Item;
public Node Next;
public Node(T item)
{
Item = item;
}
}
/* ... */
private volatile Node _tail;
/* ... */
public void Enqueue(T item)
{
Node newNode = new Node(item);
for(;;)
{
Node curTail = _tail;
if(Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null)
{
_tail = newNode;
return;
}
}
}
我们希望添加到尾部的Next
只有当它的null
- 如果不是因为写它的另一个线程。所以,我们做一个只有在这种情况下才能成功的CAS。如果是,我们设置_tail
为新节点,否则我们再试一次。
下,就必须改变是一个字段这个工作,我们不能与性能做到这一点。我们还制作_tail
volatile
,以便_tail
在所有CPU高速缓存中都是新鲜的(CompareExchange
具有易失性语义,因此不会因缺乏波动性而被破坏,但它可能会比所需的频率更频繁地运行, _tail
也)。
这是无锁的,但没有等待。如果一个线程和CAS一样远,但还没有写入_tail,并且一段时间没有任何CPU时间,那么所有其他尝试排队的线程都将保持循环,直到它被调度并设法执行为止。如果线程长时间被中止或暂停,这会导致一种永久活锁。
所以,如果我们在中科院已经失败的情况下,我们是在这样的情况。我们可以通过执行其他线程的工作是解决这一问题:
for(;;)
{
Node curTail = _tail;
if(Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null)
{
Interlocked.CompareExchange(ref _tail, newNode, curTail); //CAS in case we were assisted by an obstructed thread.
return;
}
else
{
Interlocked.CompareExchange(ref _tail, curTail.Next, curTail); //assist obstructing thread.
}
}
现在,在大多数情况下写信给curTail.Next
线程将新节点分配给_tail
- 而是通过CAS的情况下,它已经完成。但是,另一个线程无法写入curtail.Next
,它可以尝试将curTail.Next
指定为_tail
,以完成第一个线程的工作并开始自己的工作。
因此,一个无锁,免等待入队。有时间去离职。首先让我们考虑一下我们不怀疑排队空的情况。就像排队一样,我们将首先获取我们感兴趣的节点的本地副本; _head
,_tail
和_head.Next
(再次不使用空头部或尾部空队列使生活更轻松,这意味着它是安全的阅读在任何状态下_head.Next
)。也像入队,我们将取决于波动,这时候不只是_tail
,但_head
,所以我们将其更改为:
private volatile Node _head;
我们改变TryDequeue到:
public bool TryDequeue(out T item)
{
Node curHead = _head;
Node curTail = _tail;
Node curHeadNext = curHead.Next;
if (_head == _tail)
{
item = default(T);
return false;
}
else
{
item = curHeadNext.Item;
if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead)
return true;
}
}
空 - 现场案件现在是不正确的,但我们会回到那个。它是安全的项目设置为curHeadNext.Item
因为如果我们没有完成,我们将再次覆盖它的操作,但我们必须使操作写入_head
原子,并保证只要_head
没有改变的情况发生。如果没有,那么_head
已被另一个线程更新,我们可以再次循环(无需为该线程工作,它已经完成了所有会影响我们的任务)。
现在考虑如果_head == _tail
会发生什么情况。可能它是空的,但可能_tail.Next
(这将与curHeadNext
相同)是由入队写入的。在这种情况下,我们更可能想要的不是空虚的结果,而是我们将部分排队的项目出队的结果。所以,我们协助该线程,并再次继续循环:
if (curHead == curTail)
{
if (curHeadNext == null)
{
item = default(T);
return false;
}
else
Interlocked.CompareExchange(ref _tail, curHeadNext, curTail);
}
最后,唯一剩下的问题是,我们不断收到420个警告,因为我们传递挥发性领域byref
方法。这往往会阻止易失性语义(因此警告),但不会与CompareExchange
(因此我们这样做)。我们可以禁用警告,包括解释我们为什么这么做的评论(我尝试不要在没有正当评论的情况下禁用警告),并且我们已经提供了前面给出的代码。
请注意,我们在GC支持框架中这样做很重要。如果我们不得不处理重新分配,它将变得更加复杂。
我注意到你不想锁定的评论,因为它已经很慢并且锁定会让它变慢。如果这个代码已经很慢,那么相比之下,锁定的开销会很小。锁定速度通常很快,但人们认为它非常慢。我曾经看到人们在没有进行任何测量的情况下创建了精美的读写器锁。当被迫这样做时,花哨的人往往比较慢。做简单的.net锁定,然后通过配置文件找出你很慢的地方 – pm100 2010-10-06 23:01:35