2017-07-18 39 views
-1

(好吧,我已经完全搞砸了,所以我要清理它并提供我可以在这里提供的所有信息。请参阅此底部我希望有人能看到我所做的一切,并学会如何从我的错误中发布更好的文章:))通过Parallel.ForEach保留单个后台线程

请参阅标题为“更好的解释”一节,以获得更好的解释。


编辑2:我不够明确道歉。在这种情况下,ItemStore不是一个集合,它是一个DB支持的服务。我更新了我的代码。


编辑:附加信息。

  • 后备存储将成为数据库。这意味着我们可以将项目保留在队列中,而不必担心应用程序死亡时丢失项目。这也意味着从数据库添加/检索项目可能会很慢。 (即,不如内存中那么快)。

  • 正因为如此,这也意味着我们不想在整个过程中保存一个集合。首选项是返回到数据库的下一个项目,再次为了持久性安全。

  • 最终,物品来自网络服务呼叫。本质上,Enqueue将是浏览器将执行HTTP POST的WebAPI路由。

  • 最后,我们试图解决的中心问题是将潜在的一堆请求集中到单个FIFO队列中,这主要是由于我们使用的第三方库的限制。因此,我们的目标是获得10个并发请求并逐个处理它们。

我不知道附加信息有多大帮助,但它在那里。 :)


我想创建一个简单的处理队列。在Enqueue上,它将商品添加到商店,然后检查处理线程是否已在运行。如果是,就完成了;如果不是,它会启动运行队列的线程。

队列线程本身查询商店的下一个项目并处理它。然后它查询商店的下一个商品,并继续前进,直到商品用完。然后停止处理并关闭直到下一个项目入队。

的代码基本上是这样的:

// Service that saves and retrieves items from a database 
private IItemService _service; 

// Item processor 
private IItemProcessor _itemProcessor; 

public void Enqueue(object item) 
{ 
    _service.Save(item); 
    if (_isRunning) 
    { 
     // If the queue is processing, the item just gets added to the DB and 
     // the processing function will pull it off of the DB when needed. 
     return; 
    } 

    // If it's not already running, process whatever queue items are in the DB. 
    ProcessQueue(); 
} 

private void ProcessQueue() 
{ 
    _workerThread = new ThreadStart(ProcessQueueInternal); 
    _workerThread.Start(); 
} 

private void ProcessQueueInternal() 
{ 
    // _service.GetNextItem() retrieves an item from the DB based on several 
    // factors, including whether another instance of a queue has claimed it, 
    // priority, etc. 
    object item; 
    while (item = _service.GetNextItem()) != null) 
    { 
     _itemProcessor.ProcessItem(item); 
    } 

    // No more items in the DB, so the queue should sit idle until a new 
    // item is enqueued. 
    _isRunning = false; 
} 

我使用的是Parallel.ForEach()循环测试出这个队列,像这样:

Parallel.ForEach(myItems, item => Enqueue(item)); 

我遇到的问题是,偶尔队列发射两次,我想避免。不是经常性的,但它足以让我想阻止这种情况的发生。

我该如何解决这个问题?有可能多个项目会同时入队,我需要确保一次只运行一个后台线程。最简单的方法就是这样吗?

private void ProcessQueue() 
{ 
    if (_workerThread == null || (_workerThread != null && _workerThread.IsAlive)) 
    { 
     _workerThread = new ThreadStart(ProcessQueueInternal); 
     _workerThread.Start(); 
    } 
} 

或者还有更好的方法吗?简单是这里的目标,仅次于效率。


一个更好的解释

目标:漏斗一堆HTTP请求的,可以在使过程中被在单个线程运行的方式触发长期运行的进程。该工作流程如下:

  1. 用户发送一个HTTP POST(从角部位我们也在发展),我们都需要执行它,包括执行路线的所有信息。

  2. POST命中WebAPI ApiController,它自己调用服务。

    • 该服务本身通过Unity实例化为ContainerControlledLifetimeManager,所以它始终在后台运行。 (我们已经测试过这种情况。)
  3. 服务补充说,通过的EntityFramework进来从POST到数据库表中的数据。

    • 如果服务已经在处理项目,它只是停在这里。
    • 如果服务不处理项目,它开始这样做。
  4. 服务通过从数据库中一次一个地检索它们来处理项目。每个项目都是通过获取所有数据并发送HTTP POST到另一个服务来处理的(它启动了一个不能真正同时运行的进程,这是我们使用的库的限制),然后等待它去完成。一旦完成,它将该项目的状态设置为成功/错误,然后从数据库获取下一个项目并重复,直到数据库中没有更多项目要处理。

  5. 根据优先级从数据库中选择项目,以及它是否已在之前运行(即状态为InQueue而不是成功/错误)。

有三个好处在这里,我们的目标为:

  1. 随着DB作为后备存储,我们对凡申请死亡出于某种原因情况下,一些安全性。

  2. 队列不需要在用完要处理的项目时继续轮询数据库。它只是呆在那里,直到一个新项目入队,在这种情况下,整个过程再次启动。

  3. 由于内部没有备份收集功能,因此当数据从DB移出并且应用程序由于某种原因死亡时,我们无需担心数据丢失。这与#1有关。

我们最大的危险 - 我跑入问题 - 是最终的入口点在这里是一个网站,并在该网站上的一个按钮。所以100人完全有可能同时击中按钮,并且最终这个混乱结束的过程必须以连续方式运行。所以我们需要将所有这些请求汇集到单个文件行。结果,整个队列应该由一个线程处理。这里我使用一个名为_workerThread的线程。我遇到的问题是确保_workerThread被实例化,并在任何周期启动一次。那就是:

  • 队列正在处理一个新项目进来:不启动一个新线程
  • 队列正在处理中,并且有新项目进入:开始一个新线程

我能想到的模拟多个用户的唯一方法是通过Parallel.ForEach。我将在下面解释我的测试方法。

代码:队列服务的更新代码在上面。具体而言,EnqueueProcessQueueProcessQueueInternal是导致我的问题的相关部分。我已经将它们更新为尽可能清楚。最终,它们包含两个主要部分:

  • _service是一个单独的项目服务,负责简单的保存,删除和更新方法,以及拉动下一个项目从队列中。它通过依赖注入插入到队列中。

  • _itemProcessor是一个单独的类负责处理项目。在现实世界中,它会创建一个HttpClient并在项目数据中激发请求。我把它分开大部分,所以我可以创建一个假的单元测试没有数据库的队列。

测试:我试图通过单元测试来测试这一点,因为我们还没有对UI挂钩必要在现实世界中来测试这一点。要做到这一点,我做了“假”版本的项目服务和项目处理器:

  • 的假物品服务只是存储在List<WebRequestQueueItem>新队列项目。这可能是我的问题的原因,现在我想到了,但我不确定。我有点害怕,为虚假服务使用某种线程安全的集合将为现实世界的问题增加一个“修复”(因为当队列实际在单元测试之外使用时,它将使用数据库作为其后备存储)。

  • 假冒物品处理器在1500ms时间内只是做Thread.Sleep。它在那里模拟正在采取的终极行动将需要一段时间。

要模拟多个人同时打开服务器,我正在使用Parallel.ForEach()。我不知道有更好的方法来模拟这个。

的问题:最后的问题是,Parallel.ForEach()循环增加项目一次性全部项目的服务,但它这样做的速度不够快队列没有时间认识到,项目已被处理。所以它从另一个_workerThread开始,这正是我不希望它做的。

我的怀疑是,这是一般的过程被打破,而不是我在本例中使用List作为后备存储的事实。不知何故,我需要确保如果项目添加得非常快,或者如果数十人一次将所有项目添加到队列中,队列的多个实例不会被启动。我发现,一旦队列开始运行,一切正常 - 新的项目可以添加到它,他们会得到处理,当服务到达它。但这是最初的开始,这导致我的问题。

有关数据库服务本身的说明:它使用EntityFramework以及用于添加/更新/删除项目的标准方法。这些模式在我们的整个产品中都是相同的,而且我们没有遇到任何我知道的问题。不过,这些方法是这样的:

添加

_context.WebRequestQueueItems.Add(someItemEntity); 
_context.SaveChanges(); 

更新

_context.WebRequestQueueItems.AddOrUpdate(someItemEntity); 
_context.SaveChanges(); 

删除

_context.WebRequestQueueItems.Remove(someItemEntity); 
_context.SaveChanges(); 

GetNextItem(大致;该条款是稍微比这更复杂,但你的想法)

return _context 
     .WebRequestQueueItems 
     .OrderByDescending(item => item.Priority) 
     .FirstOrDefault(); 
+2

您的代码不是线程安全的。停止发明轮子并使用像System.Collections.Concurrent.BlockingCollection这样的内置类或相似的类。 –

+0

@ L.B我在帖子的顶部添加了更多信息,但我故意避免使用内部集合来管理队列。目标是将一堆请求汇集到单行文件中,使用数据库来保存持久性请求。不过,我会研究'BlockingCollection'。 –

+1

'故意试图避免使用内部集合来管理队列'你正在做的就是这个。所以我仍然坚持BlockingCollection。但还有其他专业解决方案,如MSMQ RabbitMQ IBM的MQ等(事实上,如果你知道你在做什么,它是简单的生产者 - 消费者问题) –

回答

1

对于初学者来说,尝试这样的代码:

private static object _gate = new object(); 

private void ProcessQueue() 
{ 
    if (_workerThread == null || (_workerThread != null && _workerThread.IsAlive)) 
    { 
     lock (_gate) 
     { 
      if (_workerThread == null || (_workerThread != null && _workerThread.IsAlive)) 
      { 
       _workerThread = new ThreadStart(ProcessQueueInternal); 
       _workerThread.Start(); 
      } 
     } 
    } 
} 

此代码将防止两个线程开始的同时,但它并不能阻止线程在第一个if之后但在第二个之前闲置的情况。您必须确保您在多个地方拨打电话ProcessQueue,以确保您的队列不会停止。

+0

这似乎已经成功了。谢谢! –