2013-03-25 47 views
4

我有4个主题。一个是从网络上读取一些信息写入变量,并在每个信号之后发出信号。其中3人正在阅读这个变量,应该只读一次。当前的解决方案是编写者在写入并等待读者事件后设置事件。读者等待事件,然后阅读并设置他们的事件(意思是他们阅读)。问题是读者可以阅读不止一次,而且我有重复的内容。我如何才能达到读者准确读一次的规则?C#One Writer许多读者只阅读一次

+0

我会从任何可行的解决方案更喜欢开始,然后做的更好 – syned 2013-03-25 21:13:22

+1

我想读者应该处理重复写消息。我认为你不应该把它留给客户来假设一次改变事件只会触发一次。例如,有可能导致此行为的网络场景。 – neontapir 2013-03-25 21:15:28

回答

2

一种方法是以下

的数据线程作为一个单向链表之间共享。列表中的每个节点都可以是标记或具有数据。该列表以作为输入到标记的单个节点开始。当读取数据时,会形成一个新的列表,其中包含一系列数据节点,后面跟着一个标记。该列表将追加到添加到列表中的最新标记。

每一个读者线索都从参考原始标记节点和AutoResetEvent开始。每当有新的数据进入写入器时,它就会为每个读取器线程发出信号AutoResetEvent。读者线程将会直接走到找到没有Next节点的标记。

该方案确保所有读者只能看到一次数据。最大的难题是构建列表,以便可以无锁地写入和读取它。这是非常有Interlocked.CompareExchange虽然

链表型直线前进

class Node<T> { 
    public bool IsMarker; 
    public T Data; 
    public Node<T> Next; 
} 

样品作家型

class Writer<T> { 
    private List<AutoResetEvent> m_list; 
    private Node<T> m_lastMarker; 

    public Writer(List<AutoResetEvent> list, Node<T> marker) { 
    m_lastMarker = marker; 
    m_list = list; 
    } 

    // Assuming this can't overlap. If this can overload then you will 
    // need synchronization in this method around the writing of 
    // m_lastMarker  
    void OnDataRead(T[] items) { 
    if (items.Length == 0) { 
     return; 
    } 

    // Build up a linked list of the new data followed by a 
    // Marker to signify the end of the data. 
    var head = new Node<T>() { Data = items[0] }; 
    var current = head; 
    for (int i = 1; i < items.Length; i++) { 
     current.Next = new Node<T>{ Data = items[i] }; 
     current = current.Next; 
    } 
    var marker = new Node<T> { IsMarker = true }; 
    current.Next = marker; 

    // Append the list to the end of the last marker node the writer 
    // created 
    m_lastMarker.Next = head; 
    m_lastMarker = marker; 

    // Tell each of the readers that there is new data 
    foreach (var e in m_list) { 
     e.Set(); 
    } 
    } 
} 

样品读卡类型

class Reader<T> { 
    private AutoResetEvent m_event; 
    private Node<T> m_marker; 

    void Go() { 
    while(true) { 
     m_event.WaitOne(); 
     var current = m_marker.Next; 
     while (current != null) { 
     if (current.IsMarker) { 
      // Found a new marker. Always record the marker because it may 
      // be the last marker in the chain 
      m_marker = current; 
     } else { 
      // Actually process the data 
      ProcessData(current.Data); 
     } 
     current = current.Next; 
     } 
    } 
    } 
} 
+0

这听起来有点复杂...... – syned 2013-03-25 21:21:17

+1

@syned正确的多线程代码很少简单 – JaredPar 2013-03-25 21:23:56

0

我会建议ConcurrentQueue - 它guarnatees每个线程从队列中获得一个唯一的实例。 Here是一个很好的解释如何使用它。

ConnurrentQueue<T>.TryDequeue()是一个线程安全的方法,用于检查队列是否为空,以及是否不从队列中获取项目。既然它同时执行两个操作,程序员不必担心竞态条件。实现这个

+0

因此,作家将新数据添加到队列中,然后读者读取并删除该元素? – syned 2013-03-25 21:17:50

+0

@syned,正确。 TryDequeue()同时兼有 – alexm 2013-03-25 21:32:29

+0

但是编写者知道应该将多少元素添加到该队列中?或者它会自动复制三个元素中的元素? – syned 2013-03-25 21:43:01

0

我想我已经找到了办法去。我创建了2个AutoResetEvent数组,每个读取器有2个事件,等待写入事件和设置读取事件,并且写入器设置所有写入事件并等待所有读取事件。

JaredPar,你的答案是有用的,可以帮助我

1

我的意见,即表示你应该在代码的消费者线程接受相同的值多次获得的可能性同意。也许最简单的方法是为每个更新添加一个顺序标识符。这样,线程可以将顺序标识与它读取的最后一个标识进行比较,并知道它是否重复。

它也知道它是否错过了一个值。

但是,如果你真的需要它们锁定步骤并且只能获得一次值,那么我建议你使用两个ManualResetEvent对象和一个CountdownEvent。以下是如何使用它们。

ManualResetEvent DataReadyEvent = new ManualResetEvent(); 
ManualResetEvent WaitForResultEvent = new ManualResetEvent(); 
CountdownEvent Acknowledgement = new CountdownEvent(NumWaitingThreads); 

读者线程等待DataReadyEvent

当其他线程读取来自网络的价值,它这样做:

Acknowledgement.Reset(NumWaitingThreads); 
DataReadyEvent.Set(); // signal waiting threads to process 
Acknowledgement.WaitOne(); // wait for all threads to signal they got it. 
DataReadyEvent.Reset(); // block threads' reading 
WaitForResultEvent.Set(); // tell threads they can continue 

等待的线程做到这一点:

DataReadyEvent.WaitOne(); // wait for value to be available 
// read the value 
Acknowledgement.Set(); // acknowledge receipt 
WaitForResultEvent.WaitOne(); // wait for signal to proceed 

这与具有每等待两个事件相同的效果线程,但更简单。

但它的缺点是,如果一个线程崩溃,这将挂起倒计时事件。但是,如果生产者线程等待所有的线程消息,你的方法也会如此。

1

这是一个很适合the Barrier class

您可以使用两个Barriers在两个状态之间进行触发。

下面是一个例子:

using System; 
using System.Threading; 
using System.Threading.Tasks; 

namespace Demo 
{ 
    internal class Program 
    { 
     private static void Main(string[] args) 
     { 
      int readerCount = 4; 

      Barrier barrier1 = new Barrier(readerCount + 1); 
      Barrier barrier2 = new Barrier(readerCount + 1); 

      for (int i = 0; i < readerCount; ++i) 
      { 
       Task.Factory.StartNew(() => reader(barrier1, barrier2)); 
      } 

      while (true) 
      { 
       barrier1.SignalAndWait(); // Wait for all threads to reach the "new data available" point. 

       if ((value % 10000) == 0)  // Print message every so often. 
        Console.WriteLine(value); 

       barrier2.SignalAndWait(); // Wait for the reader threads to read the current value. 
       ++value;     // Produce the next value. 
      } 
     } 

     private static void reader(Barrier barrier1, Barrier barrier2) 
     { 
      int expected = 0; 

      while (true) 
      { 
       barrier1.SignalAndWait(); // Wait for "new data available". 

       if (value != expected) 
       { 
        Console.WriteLine("Expected " + expected + ", got " + value); 
       } 

       ++expected; 
       barrier2.SignalAndWait(); // Signal that we've read the data, and wait for all other threads. 
      } 
     } 

     private static volatile int value; 
    } 
} 
+0

它看起来像个好主意,谢谢,我会调查它 – syned 2013-03-25 22:10:46