3

我是一个坚信重新学习的学习者。以这种心态,我着手实现自定义线程池。我为自己设定的目标如下:学习实现线程池 - 使用autoresetevent时发生信号丢失的事件丢失

  1. 为了能够在线程池中排队工作项目。
  2. 能够处理具有固定数量线程的工作项 - 全部同时创建。
  3. 通用工作线程函数应该只知道如何使用deque,而不应该处理其他函数/属性,如 IsEmpty或Count。

我成功地实现了上述目标,但想验证我在专家的方法上使用了stackoverflow。另外,想知道是否有更好的方法,或者多线程专家如何解决这个问题。以下段落提到了我面临的挑战以及我如何解决这个问题。

我创建的线程池在内部维护了一个工作项目队列,所有工作线程都从中选取项目并处理它。每当一个新项目排队时,它就会发出一个事件的信号,以便任何空闲的线程都可以把它捡起来并执行它。

我开始使用autoresetevent向队列中的任何新工作项目发出等待线程的信号,但是我遇到了丢失信号事件的问题。它发生在多个项目排队并且没有空闲线程处理项目时发生。未处理的总项目与由于重叠设置(信令)事件而丢失的总信号事件相同。

为了解决丢失信号事件的问题,我在autoresetevent的基础上创建了一个包装器,并用它来代替autoresetevent。它解决了这个问题。这里是代码为同一上市:

public static class CustomThreadPool 
{ 
    static CustomThreadPool() 
    { 
     for (int i = 0; i < minThreads; i++) 
      _threads.Add(
       new Thread(ThreadFunc) { IsBackground = true } 
       ); 

     _threads.ForEach((t) => t.Start()); 
    } 

    public static void EnqueWork(Action action) 
    { 
     _concurrentQueue.Enqueue(action); 
     _enqueEvent.Set(); 
    } 

    private static void ThreadFunc() 
    { 
     Action action = null; 
     while (true) 
     { 
      _enqueEvent.WaitOne(); 
      _concurrentQueue.TryDequeue(out action); 
      action(); 
     } 
    } 

    private static ConcurrentQueue<Action> _concurrentQueue = new ConcurrentQueue<Action>(); 
    private static List<Thread> _threads = new List<Thread>(); 
    private static CountAutoResentEvent _enqueEvent = new CountAutoResentEvent(); 
    private static object _syncObject = new object(); 
    private const int minThreads = 4; 
    private const int maxThreads = 10; 

    public static void Test() 
    { 
     CustomThreadPool.EnqueWork(() => { 

      for (int i = 0; i < 10; i++) Console.WriteLine(i); 
      Console.WriteLine("****First*****"); 
     }); 

     CustomThreadPool.EnqueWork(() => 
     { 
      for (int i = 0; i < 10; i++) Console.WriteLine(i); 
      Console.WriteLine("****Second*****"); 
     }); 

     CustomThreadPool.EnqueWork(() => 
     { 
      for (int i = 0; i < 10; i++) Console.WriteLine(i); 
      Console.WriteLine("****Third*****"); 
     }); 

     CustomThreadPool.EnqueWork(() => 
     { 
      for (int i = 0; i < 10; i++) Console.WriteLine(i); 
      Console.WriteLine("****Fourth*****"); 
     }); 

     CustomThreadPool.EnqueWork(() => 
     { 
      for (int i = 0; i < 10; i++) Console.WriteLine(i); 
      Console.WriteLine("****Fifth*****"); 
     }); 
    } 
} 

public class CountAutoResentEvent 
{ 
    public void Set() 
    { 
     _event.Set(); 
     lock (_sync) 
      _countOfSet++; 
    } 

    public void WaitOne() 
    { 
     _event.WaitOne(); 
     lock (_sync) 
     { 
      _countOfSet--; 
      if (_countOfSet > 0) 
       _event.Set(); 
     } 
    } 

    private AutoResetEvent _event = new AutoResetEvent(false); 
    private int _countOfSet = 0; 
    private object _sync = new object(); 
} 

现在,我有几个问题:

  1. 是我的做法充分证明?
  2. 什么同步机制最适合这个问题,为什么?
  3. 多线程专家如何处理这个问题?

谢谢。

+0

@Hans Passant:在我看来,这是谈论一个不同的问题。这个问题有几点。 – Tudor 2011-12-17 09:00:29

回答

1

从我所看到的我会说这是正确的。我喜欢你已经使用ConcurrentQueue,并没有去实现你自己的同步队列。这是一团糟,很可能不会像现有的那样快。

我只想说明您的自定义“信号机制”实际上与信号量非常相似:允许多个线程进入临界区的锁。该功能已存在于Semaphore类中。

+0

@Hans Passant:他没有说这是不正确的。他问它是否正确。为什么信号不正确?基本上他实施的是带有计数器的事件。 – Tudor 2011-12-17 09:26:20