2009-12-16 104 views
13

我正在写一个简单的应用程序(对于我的妻子不低于:-P),它可以为大量图像做一些图像处理(调整大小,时间戳等)。所以我正在编写一个可以同步和异步执行此操作的库。我决定使用Event-based Asynchronous Pattern。当使用这种模式时,您需要在工作完成时提出事件。这是我知道何时完成的问题。所以基本上,我DownsizeAsync方法我在做这样的事情(缩编图像异步方法):让多个线程工作并等待他们全部完成的最佳方式是什么?

public void DownsizeAsync(string[] files, string destination) 
    { 
     foreach (var name in files) 
     { 
      string temp = name; //countering the closure issue 
      ThreadPool.QueueUserWorkItem(f => 
      { 
       string newFileName = this.DownsizeImage(temp, destination); 
       this.OnImageResized(newFileName); 
      }); 
     } 
    } 

都完成时,他们最棘手的部分,现在是知道。

下面是我考虑过的:使用像这里的ManualResetEvents:http://msdn.microsoft.com/en-us/library/3dasc8as%28VS.80%29.aspx但我遇到的问题是,你只能等待64或更少的事件。我可能有许多更多的图像。

第二个选项:有一个统计已完成的图像的计数器,并引发该事件当数达到总:

public void DownsizeAsync(string[] files, string destination) 
{ 
    foreach (var name in files) 
    { 
     string temp = name; //countering the closure issue 
     ThreadPool.QueueUserWorkItem(f => 
     { 
      string newFileName = this.DownsizeImage(temp, destination); 
      this.OnImageResized(newFileName); 
      total++; 
      if (total == files.Length) 
      { 
       this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null)); 
      } 
     }); 
    } 


} 

private volatile int total = 0; 

现在这种感觉“哈克”,我不能完全肯定如果这是线程安全的。

所以,我的问题是,这样做的最好方法是什么?是否有另一种方法来同步所有线程?我应该不使用ThreadPool吗?谢谢!!

UPDATE基于评价和从几个答案,我决定采取这种方式反馈:

首先,我创建了一个批处理到枚举“批”的扩展方法:

public static IEnumerable<IEnumerable<T>> GetBatches<T>(this IEnumerable<T> source, int batchCount) 
    { 
     for (IEnumerable<T> s = source; s.Any(); s = s.Skip(batchCount)) 
     { 
      yield return s.Take(batchCount); 
     } 
    } 

基本上,如果你做这样的事情:

 foreach (IEnumerable<int> batch in Enumerable.Range(1, 95).GetBatches(10)) 
     { 
      foreach (int i in batch) 
      { 
       Console.Write("{0} ", i); 
      } 
      Console.WriteLine(); 
     } 

你得到这样的输出:

1 2 3 4 5 6 7 8 9 10 
11 12 13 14 15 16 17 18 19 20 
21 22 23 24 25 26 27 28 29 30 
31 32 33 34 35 36 37 38 39 40 
41 42 43 44 45 46 47 48 49 50 
51 52 53 54 55 56 57 58 59 60 
61 62 63 64 65 66 67 68 69 70 
71 72 73 74 75 76 77 78 79 80 
81 82 83 84 85 86 87 88 89 90 
91 92 93 94 95 

这个想法是(作为评论中的某些人指出),没有必要为每个图像创建一个单独的线程。因此,我会将图像批量加入[machine.cores * 2]。然后,我将使用我的第二种方法,这只是为了让柜台继续前进,当柜台达到我期待的总数时,我就知道我已经完成了。

我之所以现在,它实际上是线程安全的确信,是因为我已标记的总变量作为挥发性它根据MSDN

volatile修饰符通常用于 了字段,通过 多线程访问,而不使用 锁语句来序列化访问。 使用volatile修饰符确保 一个线程检索最 跟上时代的价值被另一个线程 书面

意味着我应该在明确的(如果不是,请让我知道!)

因此,这里是我跟去的代码:

public void DownsizeAsync(string[] files, string destination) 
    { 
     int cores = Environment.ProcessorCount * 2; 
     int batchAmount = files.Length/cores; 

     foreach (var batch in files.GetBatches(batchAmount)) 
     { 
      var temp = batch.ToList(); //counter closure issue 
      ThreadPool.QueueUserWorkItem(b => 
      { 
       foreach (var item in temp) 
       { 
        string newFileName = this.DownsizeImage(item, destination); 
        this.OnImageResized(newFileName); 
        total++; 
        if (total == files.Length) 
        { 
         this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null)); 
        } 
       } 
      }); 
     } 
    } 

我接受反馈,我绝不是在多线程方面的专家,因此,如果有人看到任何问题与此,或有一个更好的主意,请让我知道。 (是的,这只是一个自制的应用程序,但我对如何使用我在这里获得的知识来改进我们在工作中使用的搜索/索引服务有一些想法。)现在,我会将此问题保持开放直到我感觉我正在使用正确的方法。谢谢大家的帮助。

+3

总++看起来没有线程安全! – RichardOD 2009-12-16 15:05:30

+2

计数器方法具有很高的可扩展性。只需使用Interlocked.Increment和.Decrement使其线程安全。 – 2009-12-16 15:39:42

+0

实际上,64是这个限制吗?即使使用64个物理内核,您也会遇到内存和磁盘瓶颈问题,并行访问会降低速度。但是,这些可能会在一两年内消失。 – peterchen 2009-12-16 15:54:57

回答

11

最简单的事情就是创建新线程,然后在每个线程上调用Thread.Join。你可能使用信号量或类似的东西 - 但它可能更容易创建新的线程。

在.NET 4.0中,您可以使用并行扩展来轻松完成任务。

作为另一种替代其使用线程池,你可以创建一个委托,并调用BeginInvoke它,返回一个IAsyncResult - 那么你可以通过AsyncWaitHandle财产得到WaitHandle每个结果,并调用WaitHandle.WaitAll

编辑:正如在评论中指出的那样,在一些实现中,一次最多只能调用WaitAll,最多有64个句柄。替代方案可以依次调用WaitOne,或者分批拨打WaitAll。只要你是从一个不会阻塞线程池的线程来完成,它并不重要。另请注意,您不能从STA线程调用WaitAll

+2

乔恩 - 你忘记了WaitAll的64限制。 – RichardOD 2009-12-16 15:08:11

+0

@RichardOD:谢谢,已经添加了一个关于这个的注释。 – 2009-12-16 15:12:52

2

.net 4.0使得多线程更容易(尽管你仍然可以用副作用拍摄自己)。

+1

更容易?多线程并不容易! – RichardOD 2009-12-16 15:08:55

+0

好的... C中的多线程更难。理论上图灵等价物或者你有什么,在概念上相似(如果不相同),但额外的代码行会理解核心。 – 2009-12-16 16:03:11

2

我用SmartThreadPool有很多成功来解决这个问题。还有一个关于装配的Codeplex网站。

SmartThreadPool可以帮助解决其他问题,就像某些线程不能同时运行而其他问题一样。

1

另一种选择是使用管道。

您将所有要完成的工作发布到管道,然后从每个线程的管道中读取数据。当管道是空的,你完成了,线程自行结束,每个人都很高兴(当然要确保你先完成所有的工作,然后消耗掉)

11

你仍然想使用ThreadPool,因为它会管理它同时运行的线程数量。我最近遇到了类似的问题,解决它像这样:

var dispatcher = new ThreadPoolDispatcher(); 
dispatcher = new ChunkingDispatcher(dispatcher, 10); 

foreach (var image in images) 
{ 
    dispatcher.Add(new ResizeJob(image)); 
} 

dispatcher.WaitForJobsToFinish(); 

的IDispatcher和IJob是这样的:

public interface IJob 
{ 
    void Execute(); 
} 

public class ThreadPoolDispatcher : IDispatcher 
{ 
    private IList<ManualResetEvent> resetEvents = new List<ManualResetEvent>(); 

    public void Dispatch(IJob job) 
    { 
     var resetEvent = CreateAndTrackResetEvent(); 
     var worker = new ThreadPoolWorker(job, resetEvent); 
     ThreadPool.QueueUserWorkItem(new WaitCallback(worker.ThreadPoolCallback)); 
    } 

    private ManualResetEvent CreateAndTrackResetEvent() 
    { 
     var resetEvent = new ManualResetEvent(false); 
     resetEvents.Add(resetEvent); 
     return resetEvent; 
    } 

    public void WaitForJobsToFinish() 
    { 
     WaitHandle.WaitAll(resetEvents.ToArray() ?? new ManualResetEvent[] { }); 
     resetEvents.Clear(); 
    } 
} 

然后使用装饰来块使用线程池:

public class ChunkingDispatcher : IDispatcher 
{ 
    private IDispatcher dispatcher; 
    private int numberOfJobsDispatched; 
    private int chunkSize; 

    public ChunkingDispatcher(IDispatcher dispatcher, int chunkSize) 
    { 
     this.dispatcher = dispatcher; 
     this.chunkSize = chunkSize; 
    } 

    public void Dispatch(IJob job) 
    { 
     dispatcher.Dispatch(job); 

     if (++numberOfJobsDispatched % chunkSize == 0) 
      WaitForJobsToFinish(); 
    } 

    public void WaitForJobsToFinish() 
    { 
     dispatcher.WaitForJobsToFinish(); 
    } 
} 

IDispatcher抽象对于交换线程技术非常有效。我有另一个实现是SingleThreadedDispatcher,你可以创建一个像Jon Skeet建议的ThreadStart版本。然后很容易运行每一个,看看你得到什么样的表现。 SingleThreadedDispatcher在调试代码或者不想杀死你的盒子上的处理器时是很好的。

编辑:我忘了添加代码ThreadPoolWorker:在我的主要然后

public class ThreadPoolWorker 
{ 
    private IJob job; 
    private ManualResetEvent doneEvent; 

    public ThreadPoolWorker(IJob job, ManualResetEvent doneEvent) 
    { 
     this.job = job; 
     this.doneEvent = doneEvent; 
    } 

    public void ThreadPoolCallback(object state) 
    { 
     try 
     { 
      job.Execute(); 
     } 
     finally 
     { 
      doneEvent.Set(); 
     } 
    } 
} 
2

我使用一个静态工具方法来检查所有的单个等待处理..

public static void WaitAll(WaitHandle[] handles) 
    { 
     if (handles == null) 
      throw new ArgumentNullException("handles", 
       "WaitHandle[] handles was null"); 
     foreach (WaitHandle wh in handles) wh.WaitOne(); 
    } 

线程,我创建这些等待句柄的列表,并且对于我放入我的ThreadPool队列的每个委托,我将等待句柄添加到列表中...

List<WaitHandle> waitHndls = new List<WaitHandle>(); 
foreach (iterator logic) 
{ 
     ManualResetEvent txEvnt = new ManualResetEvent(false); 

     ThreadPool.QueueUserWorkItem(
      delegate 
       { 
        try { // Code to process each task... } 
        // Finally, set each wait handle when done 
        finally { lock (locker) txEvnt.Set(); } 
       }); 
     waitHndls.Add(txEvnt); // Add wait handle to List 
} 
util.WaitAll(waitHndls.ToArray()); // Check all wait Handles in List 
5

最简单有效的解决方案是使用计数器并使它们安全。这将消耗更少的内存,可扩展到更多数量的线程

这里是一个样本

int itemCount = 0; 
for (int i = 0; i < 5000; i++) 
{ 
    Interlocked.Increment(ref itemCount); 

    ThreadPool.QueueUserWorkItem(x=>{ 
     try 
     { 
      //code logic here.. sleep is just for demo 
      Thread.Sleep(100); 
     } 
     finally 
     { 
      Interlocked.Decrement(ref itemCount); 
     } 
    }); 
} 

while (itemCount > 0) 
{ 
    Console.WriteLine("Waiting for " + itemCount + " threads..."); 
    Thread.Sleep(100); 
} 
Console.WriteLine("All Done!"); 
+1

+1。我使用这种模式,虽然我检查了Interlocked.Decrement()的结果,看看我们是否已经命中了零并且设置了一个事件,如果是的话表示所有的项目都已完成。这样你就不需要在itemCount上进行轮询。 – 2009-12-16 20:26:47

+0

我喜欢这种方法,但是我想知道如果我们正在增加的变量被标记为volatile,那么这是否需要。 – BFree 2009-12-17 05:02:10

+1

volatile确保变量不被缓存,并且访问器是原子的,但不能保证read + change + write的完整操作将是原子的 – 2009-12-17 06:51:46

1

我建议把不变的图像在队列中,当你从队列中读取启动一个线程和插入它的System.Threading.Thread.ManagedThreadId属性和文件名一起转换成字典。这样您的用户界面可以列出待处理文件和活动文件。

当每个线程完成时,它调用一个回调例程,传回它的ManagedThreadId。此回调(作为线程的委托传递)从字典中删除线程的ID,从队列中启动另一个线程并更新UI。

当队列和字典都为空时,就完成了。

稍微复杂一些,但这样你可以得到一个响应式UI,你可以很容易地控制活动线程的数量,并且你可以看到有什么在飞行中。收集统计。使用WPF,并为每个文件设置进度条。她不禁对此印象深刻。

相关问题