2011-04-30 52 views
3

我有100个传感器,每个“测量”自己的数据。我只有一个DataSender应该从“传感器”发送信息。最近的信息应该发送。什么类型的IProducerConsumerCollection <T>用于我的任务?

通道的带宽可能小于100个传感器产生的数据。在这种情况下,可以跳过一些数据 - 但我们应该“大致公平”。例如,我们可以跳过每个传感器的每一秒测量。

我不知道每个传感器产生数据的频率如何,但通常它们会经常产生数据。

我的其他职位后:

我已经决定,我有古典生产者/消费者问题,有:

  • 100生产者,和
  • 1消费者

我一直建议使用BlockingCollection这一点。 BlockingCollection唯一的问题 - 一旦你有添加项目,你不能取代它。但在我的应用程序中,如果传感器产生新的值,并且以前的值未由Consumer处理,则值应该为替换为

对于该任务,我应该使用ConcurentDictionary还是ConcurentBag

在概念上,我需要是100个元素的阵列。

传感器#33应该取代它的值到数组[33]:

| Sensor | Value | 
|--------|-------| 
|  1 |  | 
|  2 |  | 
|  3 |  | 
/......../......./ 
|  32 |  | 
|  33 | 101.9 | 
|  34 |  | 
/......../......./ 
|  98 |  | 
|  99 |  | 
| 100 |  | 

Consumer应从array[33]采取值,并且如果不为空,然后把它和设定数组[33]为空。 Consumer应对阵列中的任何非空值做出反应。

+0

当有来自不同设备的消息集合中,哪一个你应该送?应该发送来自所有传感器的消息 – svick 2011-04-30 17:42:16

+0

。每个传感器具有同等优先权如果带宽不足以发送来自所有传感器的所有消息,则可以跳过一些数据。例如,我们可以从每个传感器发送每秒二次/三次/四次测量结果 – javapowered 2011-04-30 20:47:18

+0

如果您必须对每个传感器都公平,那么为什么传感器需要说他们有新的数据准备好?消费者可以按顺序轮询每个传感器,看看它是否有新的数据。如果它有新的数据,它会得到最新的并发送它,如果它不是新的,那么它只是跳过它。 – BrandonAGr 2011-04-30 20:56:05

回答

4

我想你应该实现自己的IProducerConsumerCollection<T>。这就是为什么它是一个接口:让你可以轻松地制作自己。

您可以使用Dictionary<K,V>Queue<T>来确保接收数据的公平性,即如果您只有一台设备可非常快速地生成数据,则不会仅从该设备发送数据。

public class DeviceDataQueue<TDevice, TData> 
    : IProducerConsumerCollection<Tuple<TDevice, TData>> 
{ 
    private readonly object m_lockObject = new object(); 
    private readonly Dictionary<TDevice, TData> m_data 
     = new Dictionary<TDevice, TData>(); 
    private readonly Queue<TDevice> m_queue = new Queue<TDevice>(); 

    //some obviously implemented methods elided, just make sure they are thread-safe 

    public int Count { get { return m_queue.Count; } } 

    public object SyncRoot { get { return m_lockObject; } } 

    public bool IsSynchronized { get { return true; } } 

    public bool TryAdd(Tuple<TDevice, TData> item) 
    { 
     var device = item.Item1; 
     var data = item.Item2; 

     lock (m_lockObject) 
     { 
      if (!m_data.ContainsKey(device)) 
       m_queue.Enqueue(device); 

      m_data[device] = data; 
     } 

     return true; 
    } 

    public bool TryTake(out Tuple<TDevice, TData> item) 
    { 
     lock (m_lockObject) 
     { 
      if (m_queue.Count == 0) 
      { 
       item = null; 
       return false; 
      } 

      var device = m_queue.Dequeue(); 
      var data = m_data[device]; 
      m_data.Remove(device); 
      item = Tuple.Create(device, data); 
      return true; 
     } 
    } 
} 

当沿着这些线路中使用:

Queue = new BlockingCollection<Tuple<IDevice, Data>>(
    new DeviceDataQueue<IDevice, Data>()); 

Device1 = new Device(1, TimeSpan.FromSeconds(3), Queue); 
Device2 = new Device(2, TimeSpan.FromSeconds(5), Queue); 

while (true) 
{ 
    var tuple = Queue.Take(); 
    var device = tuple.Item1; 
    var data = tuple.Item2; 

    Console.WriteLine("{0}: Device {1} produced data at {2}.", 
     DateTime.Now, device.Id, data.Created); 

    Thread.Sleep(TimeSpan.FromSeconds(2)); 
} 

它产生以下输出:

30.4.2011 20:40:43: Device 1 produced data at 30.4.2011 20:40:43. 
30.4.2011 20:40:45: Device 2 produced data at 30.4.2011 20:40:44. 
30.4.2011 20:40:47: Device 1 produced data at 30.4.2011 20:40:47. 
30.4.2011 20:40:49: Device 2 produced data at 30.4.2011 20:40:49. 
30.4.2011 20:40:51: Device 1 produced data at 30.4.2011 20:40:51. 
30.4.2011 20:40:54: Device 2 produced data at 30.4.2011 20:40:54. 
+0

谢谢,我认为这应该适合我!不过,我仍在调查我是否可以使用标准集合实现我想要的内容,可能是ConcurrentDictionary或者使用Victor Hurdugaci建议的一些“技巧”。你的建议应该可行,但可能我们可以做得更容易... – javapowered 2011-04-30 21:06:36

+0

毕竟我想我只想存储更新传感器的“索引”。实际的传感器数据存储在一个地方,我可以通过参考访问它。所以我不需要重复ConcurentQueue,我将存储“整数” - 传感器索引。为我的新问题创建单独的线程http://stackoverflow.com/questions/5845241/how-to-create-no-duplicates-concurrentqueue – javapowered 2011-04-30 23:14:24

+0

嗯,我的解决方案做到了。如果您不想为每个设备存储任何数据,则可以使用'HashSet '而不是'Dictionary'。或者你可以检查'Queue'本身,但是这样会比较慢(至少渐近地说,它可能足够满足你的需求)。 – svick 2011-05-01 00:35:44

1

而不是使用另一个数据结构,另一个窍门。您的集合中的元素不能被替换,但你可以,而不是存储的实际值,存储迷你容器。当你想更换,你实际上是在容器更换,而不是更换容器中的价值。

class ElementFromQueue 
{ 
    public object SensorData; 
} 

... 

ElementFromQueue elem = new ElementFromQueue(); 
elem.SensorData = new object(); 
... 
queue.Add(elem); //Element is in queue now 
... 
elem.SensorData = new object(); //Update the data, simulating replace 

或者只是创建一个指向队列的传感器编号。当值被弹出,最新的传感器值从另一个查询,更新,能够收集

+1

如果我创建索引队列,我需要每个索引存在一次,否则我可以两次发送相同的传感器数据。即我需要“设置”indixes :)但通常的做法是intersting,我会尝试应用它。 – javapowered 2011-04-30 17:57:58

相关问题