(好吧,我已经完全搞砸了,所以我要清理它并提供我可以在这里提供的所有信息。请参阅此底部我希望有人能看到我所做的一切,并学会如何从我的错误中发布更好的文章:))通过Parallel.ForEach保留单个后台线程
请参阅标题为“更好的解释”一节,以获得更好的解释。
编辑2:我不够明确道歉。在这种情况下,ItemStore
不是一个集合,它是一个DB支持的服务。我更新了我的代码。
编辑:附加信息。
后备存储将成为数据库。这意味着我们可以将项目保留在队列中,而不必担心应用程序死亡时丢失项目。这也意味着从数据库添加/检索项目可能会很慢。 (即,不如内存中那么快)。
正因为如此,这也意味着我们不想在整个过程中保存一个集合。首选项是返回到数据库的下一个项目,再次为了持久性安全。
最终,物品来自网络服务呼叫。本质上,Enqueue将是浏览器将执行HTTP POST的WebAPI路由。
最后,我们试图解决的中心问题是将潜在的一堆请求集中到单个FIFO队列中,这主要是由于我们使用的第三方库的限制。因此,我们的目标是获得10个并发请求并逐个处理它们。
我不知道附加信息有多大帮助,但它在那里。 :)
我想创建一个简单的处理队列。在Enqueue
上,它将商品添加到商店,然后检查处理线程是否已在运行。如果是,就完成了;如果不是,它会启动运行队列的线程。
队列线程本身查询商店的下一个项目并处理它。然后它查询商店的下一个商品,并继续前进,直到商品用完。然后停止处理并关闭直到下一个项目入队。
的代码基本上是这样的:
// Service that saves and retrieves items from a database
private IItemService _service;
// Item processor
private IItemProcessor _itemProcessor;
public void Enqueue(object item)
{
_service.Save(item);
if (_isRunning)
{
// If the queue is processing, the item just gets added to the DB and
// the processing function will pull it off of the DB when needed.
return;
}
// If it's not already running, process whatever queue items are in the DB.
ProcessQueue();
}
private void ProcessQueue()
{
_workerThread = new ThreadStart(ProcessQueueInternal);
_workerThread.Start();
}
private void ProcessQueueInternal()
{
// _service.GetNextItem() retrieves an item from the DB based on several
// factors, including whether another instance of a queue has claimed it,
// priority, etc.
object item;
while (item = _service.GetNextItem()) != null)
{
_itemProcessor.ProcessItem(item);
}
// No more items in the DB, so the queue should sit idle until a new
// item is enqueued.
_isRunning = false;
}
我使用的是Parallel.ForEach()
循环测试出这个队列,像这样:
Parallel.ForEach(myItems, item => Enqueue(item));
我遇到的问题是,偶尔队列发射两次,我想避免。不是经常性的,但它足以让我想阻止这种情况的发生。
我该如何解决这个问题?有可能多个项目会同时入队,我需要确保一次只运行一个后台线程。最简单的方法就是这样吗?
private void ProcessQueue()
{
if (_workerThread == null || (_workerThread != null && _workerThread.IsAlive))
{
_workerThread = new ThreadStart(ProcessQueueInternal);
_workerThread.Start();
}
}
或者还有更好的方法吗?简单是这里的目标,仅次于效率。
一个更好的解释
目标:漏斗一堆HTTP请求的,可以在使过程中被在单个线程运行的方式触发长期运行的进程。该工作流程如下:
用户发送一个HTTP POST(从角部位我们也在发展),我们都需要执行它,包括执行路线的所有信息。
POST命中WebAPI
ApiController
,它自己调用服务。- 该服务本身通过Unity实例化为
ContainerControlledLifetimeManager
,所以它始终在后台运行。 (我们已经测试过这种情况。)
- 该服务本身通过Unity实例化为
服务补充说,通过的EntityFramework进来从POST到数据库表中的数据。
- 如果服务已经在处理项目,它只是停在这里。
- 如果服务不处理项目,它开始这样做。
服务通过从数据库中一次一个地检索它们来处理项目。每个项目都是通过获取所有数据并发送HTTP POST到另一个服务来处理的(它启动了一个不能真正同时运行的进程,这是我们使用的库的限制),然后等待它去完成。一旦完成,它将该项目的状态设置为成功/错误,然后从数据库获取下一个项目并重复,直到数据库中没有更多项目要处理。
根据优先级从数据库中选择项目,以及它是否已在之前运行(即状态为InQueue而不是成功/错误)。
有三个好处在这里,我们的目标为:
随着DB作为后备存储,我们对凡申请死亡出于某种原因情况下,一些安全性。
队列不需要在用完要处理的项目时继续轮询数据库。它只是呆在那里,直到一个新项目入队,在这种情况下,整个过程再次启动。
由于内部没有备份收集功能,因此当数据从DB移出并且应用程序由于某种原因死亡时,我们无需担心数据丢失。这与#1有关。
我们最大的危险 - 我跑入问题 - 是最终的入口点在这里是一个网站,并在该网站上的一个按钮。所以100人完全有可能同时击中按钮,并且最终这个混乱结束的过程必须以连续方式运行。所以我们需要将所有这些请求汇集到单个文件行。结果,整个队列应该由一个线程处理。这里我使用一个名为_workerThread
的线程。我遇到的问题是确保_workerThread
被实例化,并在任何周期启动一次。那就是:
- 队列正在处理一个新项目进来:不启动一个新线程。
- 队列正在处理中,并且有新项目进入:开始一个新线程。
我能想到的模拟多个用户的唯一方法是通过Parallel.ForEach
。我将在下面解释我的测试方法。
代码:队列服务的更新代码在上面。具体而言,Enqueue
,ProcessQueue
和ProcessQueueInternal
是导致我的问题的相关部分。我已经将它们更新为尽可能清楚。最终,它们包含两个主要部分:
_service
是一个单独的项目服务,负责简单的保存,删除和更新方法,以及拉动下一个项目从队列中。它通过依赖注入插入到队列中。_itemProcessor
是一个单独的类负责处理项目。在现实世界中,它会创建一个HttpClient
并在项目数据中激发请求。我把它分开大部分,所以我可以创建一个假的单元测试没有数据库的队列。
测试:我试图通过单元测试来测试这一点,因为我们还没有对UI挂钩必要在现实世界中来测试这一点。要做到这一点,我做了“假”版本的项目服务和项目处理器:
的假物品服务只是存储在
List<WebRequestQueueItem>
新队列项目。这可能是我的问题的原因,现在我想到了,但我不确定。我有点害怕,为虚假服务使用某种线程安全的集合将为现实世界的问题增加一个“修复”(因为当队列实际在单元测试之外使用时,它将使用数据库作为其后备存储)。假冒物品处理器在1500ms时间内只是做
Thread.Sleep
。它在那里模拟正在采取的终极行动将需要一段时间。
要模拟多个人同时打开服务器,我正在使用Parallel.ForEach()
。我不知道有更好的方法来模拟这个。
的问题:最后的问题是,Parallel.ForEach()
循环增加项目一次性全部项目的服务,但它这样做的速度不够快队列没有时间认识到,项目已被处理。所以它从另一个_workerThread
开始,这正是我不希望它做的。
我的怀疑是,这是一般的过程被打破,而不是我在本例中使用List
作为后备存储的事实。不知何故,我需要确保如果项目添加得非常快,或者如果数十人一次将所有项目添加到队列中,队列的多个实例不会被启动。我发现,一旦队列开始运行,一切正常 - 新的项目可以添加到它,他们会得到处理,当服务到达它。但这是最初的开始,这导致我的问题。
有关数据库服务本身的说明:它使用EntityFramework以及用于添加/更新/删除项目的标准方法。这些模式在我们的整个产品中都是相同的,而且我们没有遇到任何我知道的问题。不过,这些方法是这样的:
添加
_context.WebRequestQueueItems.Add(someItemEntity);
_context.SaveChanges();
更新
_context.WebRequestQueueItems.AddOrUpdate(someItemEntity);
_context.SaveChanges();
删除
_context.WebRequestQueueItems.Remove(someItemEntity);
_context.SaveChanges();
GetNextItem(大致;该条款是稍微比这更复杂,但你的想法)
return _context
.WebRequestQueueItems
.OrderByDescending(item => item.Priority)
.FirstOrDefault();
您的代码不是线程安全的。停止发明轮子并使用像System.Collections.Concurrent.BlockingCollection这样的内置类或相似的类。 –
@ L.B我在帖子的顶部添加了更多信息,但我故意避免使用内部集合来管理队列。目标是将一堆请求汇集到单行文件中,使用数据库来保存持久性请求。不过,我会研究'BlockingCollection'。 –
'故意试图避免使用内部集合来管理队列'你正在做的就是这个。所以我仍然坚持BlockingCollection。但还有其他专业解决方案,如MSMQ RabbitMQ IBM的MQ等(事实上,如果你知道你在做什么,它是简单的生产者 - 消费者问题) –