0

我的任务是优化线性数据处理例程的性能。以下是已有内容的概述:实时批处理数据处理

数据进入UDP端口,我们有多个侦听器在不同的端口上侦听,并将原始数据写入SQL Server数据库(让我们称之为RawData)。然后,我们有一个单线程线性应用程序的多个实例,它从RawData表中抓取原始数据并处理单个数据行。处理的意思是将原始数据与给定实体的先前接收的数据进行比较,计算完成以计算不同读数的数量,然后针对每个单独的数据行调用几个Web服务,最后为每个数据添加新记录在ProcessedData表中的行。另外对应的实体记录在其他表中更新。

我看到问题的方式,它可以分解成更小的部分,我可以利用生产者/消费者模式进行数据处理: 生产者的一个线程填充共享(阻塞)队列,多个消费者从中获取数据行队列并对它们进行并行处理。消费者完成后,他们将处理后的数据放到另一个共享队列中,然后再由另一个消费者线程(单个)访问,这个线程将执行SqlBulkCopy来插入新记录。沿着这个过程,将会有其他共享队列存储更新的实体信息,而另一个消费者将获取实体的更新信息并执行更新。

问题是,即使它看起来很简单,但它在我看来是一个麻烦的方法。我确实有一种更好的方式来做我想找的事情。有关实施上述生产者/消费者模式的任何建议?或者我应该为我的问题寻找不同的设计模式?

在此先感谢

+0

当你说“共享查询”,你的意思是“共享队列”? – 2011-04-11 19:47:36

+0

是的,我的不好。感谢您的注意。更正 – Dimitri 2011-04-11 19:53:21

回答

2

你提出的解决方案听起来很合理,我不认为这是很麻烦的。理解简单,易于实施,有效且高效。它还允许您调整生产者和消费者的数量以实现最佳性能。分解成较小的部分,部件之间的通信有限,这是一件非常好的事情。

所以你有多个线程(生产者)从UDP读取数据并将这些项目存储在共享队列中。将其称为RawData队列。多个消费者从该队列中读取,处理项目并将结果放入另一个共享队列中。将其称为ProcessedData队列。最后,您有一个线程读取ProcessedData队列并将项目存储在数据库中。

.NET BlockingCollection是完美的。

这可能会有所帮助:Question on C# threading with RFID

+0

感谢Jim的回复。是的,我被分成了哪种类型的队列最适合我的需求。我正在考虑使用循环队列或双缓冲区,以最大限度地减少锁定时间。目前我知道我们没有那么多数据需要处理,但我总是在考虑可扩展性。我讨厌回去重写应用程序,当流量增加。 – Dimitri 2011-04-11 21:08:18

+0

我有一个BlockingCollection有点问题。尽管它实现了ConcurrentQueue作为它的集合基础,但我注意到,当我为消费者添加多个线程时,它们不保证从队列中顺序获取项目。我尝试了Parallel.Foreach和Parallel.For。他们似乎都不尊重项目的顺序。然后我尝试了Task.StartNew,这似乎是为了纪念这个序列。另外,我需要将消费者的输出存储到另一个队列中,需要订购。我可以使用相同的BlockingCollection并运行.OrderBy吗?或id更好使用SortedList和手动处理锁定? – Dimitri 2011-04-12 12:53:27

+0

@Dimitri:你触及很多问题。这可能是最好的,如果你把它作为另一个问题。 – 2011-04-12 14:38:34