2013-06-21 75 views
1

我有下面的代码,我从源中填充用户,为了举例,如下所示。我想要做的是消费BlockingCollection与多个消费者。消费阻止具有多个任务/消费者的集合

低于正确的方式做到这一点?另外什么是最好的线程数?好吧,这将取决于硬件,内存等。或者我怎样才能以更好的方式做到这一点?

也会在下面的实现确保我会处理集合中的所有东西,直到它是空的?

class Program 
    { 
     public static readonly BlockingCollection<User> users = new BlockingCollection<User>(); 

     static void Main(string[] args) 
     { 
      for (int i = 0; i < 100000; i++) 
      { 
       var u = new User {Id = i, Name = "user " + i}; 
       users.Add(u); 
      } 

      Run(); 
     } 

     static void Run() 
     { 
      for (int i = 0; i < 100; i++) 
      { 
       Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning); 
      } 
     } 

     static void Process() 
     { 
      foreach (var user in users.GetConsumingEnumerable()) 
      { 
       Console.WriteLine(user.Id); 
      } 
     } 
    } 

    public class User 
    { 
     public int Id { get; set; } 
     public string Name { get; set; } 
    } 

回答

5

一些小东西

  1. 你从没叫过CompleteAdding,不这样做,你的消费foreach循环将永远不会完成,并挂到永远。通过在初始for循环后执行users.CompleteAdding()修复该问题。
  2. 你永远都不会等待工作完成,Run()会启动你的100个线程(除非你的真实过程需要大量的等待无争议资源,否则这可能太多了)。因为任务不是前台线程,所以当您的Main退出时,它们不会让您的程序保持打开状态。您需要一个CountdownEvent来跟踪何时完成所有事情。
  3. 在生产者完成所有工作之后,您才会启动消费者,您应该将生产者分拆到单独的线程中,或者首先启动消费者,以便他们随时准备工作,同时将生产者填充到生产者上主线程。

这里是代码的更新版本与修复

class Program 
{ 
    private const int MaxThreads = 100; //way to high for this example. 
    private static readonly CountdownEvent cde = new CountdownEvent(MaxThreads); 
    public static readonly BlockingCollection<User> users = new BlockingCollection<User>(); 

    static void Main(string[] args) 
    { 
     Run(); 

     for (int i = 0; i < 100000; i++) 
     { 
      var u = new User {Id = i, Name = "user " + i}; 
      users.Add(u); 
     } 
     users.CompleteAdding(); 
     cde.Wait(); 
    } 

    static void Run() 
    { 
     for (int i = 0; i < MaxThreads; i++) 
     { 
      Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning); 
     } 
    } 

    static void Process() 
    { 
     foreach (var user in users.GetConsumingEnumerable()) 
     { 
      Console.WriteLine(user.Id); 
     } 
     cde.Signal(); 
    } 
} 

public class User 
{ 
    public int Id { get; set; } 
    public string Name { get; set; } 
} 

“线程的最佳数量”就像我前面说的,这真的取决于你在等待什么。

如果你正在处理的是CPU限制,最佳线程数可能是Enviorment.ProcessorCount

如果你正在做的是等待一个外部资源,但新的要求,不影响老的请求(例如询问信息的20台不同的服务器,服务器对服务器n负载不会影响服务器n+1负载)的那种情况下,我会让Parallel.ForEach只选择你的线程数。

如果您正在等待被争用的资源(例如读取/写入硬盘),则根本不想使用非常多的线程(可能甚至只使用一个)。我刚刚发布了a answer in another question,从硬盘读入数据时,应该只使用一个线程,这样硬盘就不会四处跳跃,试图一次完成所有读取操作。

+0

如何调用cde.Signal();来自Process(),你的意思是让这个静态只读实例? – DarthVader

+0

是的,我做了,我在这里写了代码,而不是在调试器中。我会纠正的。 –

+0

我正在使用IO,即:从交换/ AD获取用户信息,但我有这么多的用户,我需要一些并行性,这种模式会有效吗?或者你会推荐另一种方法? – DarthVader