2012-01-05 40 views
0

我想现有的转换过程中,它支持多线程和并发性,使解决方案的更多强大和可靠的方式。实现线程安全的,并行处理

采取紧急警报系统的例子。当工作人员开始工作时,将使用他们的信息创建一个新的收件人对象并将其添加到收件人集合中。相反,当它们输出时,对象被移除。在后台发生警报时,警报引擎将遍历相同的收件人列表(foreach),并在每个对象上调用SendAlert(...)。

下面是我的一些要求:

  • 将接收者如果警报过程中不应该阻拦。
  • 如果警报正在进行,删除收件人不应阻止。
  • 添加或删除收件人应该不会影响通过正在进行的警报使用 收件人列表。

我一直在寻找任务和并行类以及BlockingCollection和ConcurrentQueue类,但我不清楚最好的方法是什么。

它是那样简单使用BlockingCollection?在阅读大量文档后,我仍然不确定如果在枚举集合时调用Add,会发生什么情况。

UPDATE

一个collegue叫我去下面的文章描述了ConcurrentBag类和每个操作的行为方式:根据作者的解释

http://www.codethinked.com/net-40-and-system_collections_concurrent_concurrentbag

,看来这个集合将(几乎)为我的目的服务。我能做到以下几点:

  1. 创建一个新的集合

    VAR收件人=新ConcurrentBag();

  2. 当工人的时钟中,创建一个新的收件人,并将其添加到集合:

    recipients.Add(新的收件人());

  3. 当发生警报时,警报引擎可以通过当时的集合,因为GetEnumerator的使用藏品的快照迭代。

    的foreach(在收件人VAR收件人) recipient.SendAlert(...);

  4. 当工人的时钟输出,从集合中删除收件人:

    ???

的ConcurrentBag不提供一种方法来删除特定项目。据我所知,没有任何并发​​类。我错过了什么吗?除此之外,ConcurrentBag可以完成我所需要的一切。

回答

1

ConcurrentBag<T>绝对应该是表现最好的类出一堆供你使用了这样的情况。枚举完全按照您的朋友描述的方式工作,因此它应该适合您已布局的场景。但是,知道你必须从这个集合中删除特定的项目,唯一适用于你的类型是ConcurrentDictionary<K, V>。所有其他类型仅提供TryTake方法,其在ConcurrentBag<T>的情况下不确定,或者仅在ConcurrentQueue<T>ConcurrentStack<T>的情况下排序。

对于广播你只是做:

ConcurrentDictionary<string, Recipient> myConcurrentDictionary = ...; 

... 

foreach(Recipient recipient in myConcurrentDictionary.Values) 
{ 
    ... 
} 

调查员再次是在那一瞬间字典的快照。

+0

这似乎是一个更清洁的选项。我会试一试! – SonOfPirate 2012-01-09 16:07:28

0

像这样的实现还有很多不仅仅是并行处理方面,耐久性可能是其中最重要的。您是否考虑使用现有的PubSub技术来构建此类技术,如说... Azure TopicsNServiceBus

+0

由于知识产权限制,我无法给出解决方案的确切性质,所以我用过的例子就是这样 - 一个例子。其目的是强调一个场景,所以可以回答关于创建一个线程安全的并行操作集合的问题。诸如耐用性,容错性等方面是绝对值得关注的,但超出了本文的范围。我想专注于问题的这一方面。 – SonOfPirate 2012-01-06 01:09:06

-1

您的要求让我非常适合标准.NET事件在C#中触发的方式。我不知道如果VB语法被编译成类似的代码或不是。标准模式如下所示:

public event EventHandler Triggered; 
protected void OnTriggered() 
{ 
    //capture the list so that you don't see changes while the 
    //event is being dispatched. 
    EventHandler h = Triggered; 
    if (h != null) 
     h(this, EventArgs.Empty); 
} 

或者,您可以使用不可变列表类来存储收件人。然后,在发送警报时,它会首先将当前列表作为“快照”,在发送警报时无法通过添加和删除进行修改。例如:

class Alerter 
{ 
    private ImmutableList<Recipient> recipients; 

    public void Add(Recipient recipient) 
    { 
     recipients = recipients.Add(recipient); 
    } 

    public void Remove(Recipient recipient) 
    { 
     recipients = recipients.Remove(recipient); 
    } 

    public void SendAlert() 
    { 
     //make a local reference to the current list so 
     //you are not affected by any calls to Add/Remove 
     var current = recipients; 
     foreach (var r in current) 
     { 
      //send alert to r 
     } 
    } 
} 

您将不得不找到一个ImmutableList的实现,但是您应该可以在没有太多工作的情况下找到几个实例。在我写它的SendAlert方法中,我可能不需要做一个明确的本地来避免问题,因为foreach循环本身可以做到这一点,但我认为副本使意图更清晰。

+0

手头的问题是线程安全和并行操作。这个建议忽略了两个方面。 – SonOfPirate 2012-01-06 01:04:59

+0

@SonOfPirate这个建议处理你问的问题。你提到考虑'BlockingCollection',但担心在枚举时调用Add时会发生什么。这两个建议都以线程安全的方式解决了这个问题,并允许并行操作。如果这些看起来不能解决你的问题,那么你可能会问一个更直接的问题,而不是“像使用BlockingCollection一样简单吗?” – 2012-01-06 02:28:22

+0

您的解决方案不是线程安全的。恰当的例子就是在迭代SendAlert中的集合时调用Add时发生的情况。 (在您的代码中,由于底层集合已更改,将抛出异常,因此枚举器将不再有效。) – SonOfPirate 2012-01-06 02:42:52

1

今天早上我来到工作电子邮件从一个朋友给了我下面的两个答案:

1 - 至于在并发命名空间是如何工作的藏品,其中大部分旨在允许在不阻塞的情况下对集合进行增加和减少,并且即使在枚举收集项目的过程中也是线程安全的。

有了“正规军”的收集,得到一个枚举(通过的GetEnumerator)设置由影响收集的物品(如添加,删除或清除)的任何操作改变了“版本”的值。 IEnumerator实现将比较它创建时的版本集合与当前版本的集合。如果不同,则抛出异常并停止枚举。

并发集合的设计使用了使得它非常容易支持多线程的细分。但是,在枚举的情况下,他们实际上在调用GetEnumerator时创建集合的快照副本,并且枚举器对此副本起作用。这允许对收款人进行修改而不会对统计员产生不利影响。当然,这意味着枚举对这些更改一无所知,但听起来像您的用例允许这样做。

2 - 就您所描述的具体情况而言,我不相信需要Concurrent集合。您可以使用ReaderWriterLock封装标准集合,并在枚举时应用与Concurrent集合相同的逻辑。

这里是我的建议:

public class RecipientCollection 
{ 
    private Collection<Recipient> _recipients = new Collection<Recipient>(); 
    private ReaderWriterLock _lock = new ReaderWriterLock(); 

    public void Add(Recipient r) 
    { 
     _lock.AcquireWriterLock(Timeout.Infinite); 

     try 
     { 
      _recipients.Add(r); 
     } 
     finally 
     { 
      _lock.ReleaseWriterLock(); 
     } 
    } 

    public void Remove(Recipient r) 
    { 
     _lock.AcquireWriterLock(Timeout.Infinite); 

     try 
     { 
      _recipients.Remove(r); 
     } 
     finally 
     { 
      _lock.ReleaseWriterLock(); 
     } 
    } 

    public IEnumerable<Recipient> ToEnumerable() 
    { 
     _lock.AcquireReaderLock(Timeout.Infinite); 

     try 
     { 
      var list = _recipients.ToArray(); 

      return list; 
     } 
     finally 
     { 
      _lock.ReleaseReaderLock(); 
     } 
    } 
} 

的ReaderWriterLock确保如果另一个改变集合的内容操作过程中的操作仅阻止。只要该操作完成,锁定就会被释放,并且下一个操作可以继续。

您的警报引擎将使用ToEnumerable()方法获取当时集合的快照副本并枚举副本。

根据警报的发送频率和变化都对集合进行,这可能是一个问题,但你也许能仍实现某种类型的版本属性时,将产品添加或更改删除,警报引擎可以检查该属性以查看是否需要再次调用ToEnumerable()以获取最新版本。或者通过将数组缓存到RecipientCollection类中并在添加或删除项目时使缓存失效来对其进行封装。

HTH