2010-07-16 84 views
0

我想我可能需要重新考虑我的设计。我很难缩小一个导致我的电脑完全挂起的错误,有时会抛出VS 2010的HRESULT 0x8007000E。线程池/ WaitHandle资源泄漏/崩溃

我有一个控制台应用程序(我将稍后转换为服务)基于数据库队列传输文件。

我限制了允许传输的线程。这是因为我们连接的某些系统只能包含来自特定帐户的一定数量的连接。

例如,系统A只能接受3个同时连接(这意味着3个独立的线程)。这些线程中的每一个都有自己独特的连接对象,所以我们不应该遇到任何同步问题,因为它们没有共享连接。

我们想循环处理来自这些系统的文件。因此,例如,我们将允许3个连接,每个连接最多可传输100个文件。这意味着,要从系统A移动1000个文件,我们每个周期只能处理300个文件,因为每个文件允许3个线程,每个文件有100个文件。因此,在这个转移的整个生命周期中,我们将有10个线程。我们一次只能运行3个。所以,会有3个周期,最后一个周期只会使用1个线程来传输最后的100个文件。 (3个线程×100个=文件每循环300名的文件)

现行的结构通过例子是:

  1. 甲System.Threading.Timer检查每5秒的东西通过调用GetScheduledTask做队列()
  2. 如果没有什么,GetScheduledTask()简单地什么也不做
  3. 如果有工作,创建一个线程池线程来处理工作[工作线程A]
  4. 工作线程A看到,有1000个文件传输
  5. 工作线程A看到,它只能有3个线程中运行的系统中,它从
  6. 工作线程A获取文件启动三个新的工作线程[B,C,d]和转让
  7. 工作线程用于等待B,C,d [WaitHandle.WaitAll(transfersArray)]
  8. 工作线程A看到有队列更文件(应该是700现在)
  9. 工作线程A创建一个新的阵列上等待[transfersArray = new TransferArray[3]这是最大的系统A,但可能会在系统上有所不同
  10. 工作线程A启动三个新工作线程[B,C,D]并等待它们[WaitHandle.WaitAll(transfersArray)]
  11. 该过程重复,直到没有更多的文件要移动。
  12. ,它是做

工作线程A信号,我使用的ManualResetEvent来处理信号。

我的问题是:

  1. 是否有任何明显的情况下,这将导致该我遇到资源泄漏或问题?
  2. 我应该循环数组通每一个WaitHandle.WaitAll(array)后,并呼吁array[index].Dispose()?
  3. 任务管理器下句柄计数此过程中慢慢爬行
  4. 我打电话工作者线程A的从的System.Threading初始创建。计时器。这会有什么问题吗?该定时器的代码是:

(用于调度一些类代码)

private ManualResetEvent _ResetEvent; 

private void Start() 
{ 
    _IsAlive = true; 
    ManualResetEvent transferResetEvent = new ManualResetEvent(false); 
    //Set the scheduler timer to 5 second intervals 
    _ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000); 
} 

private void ScheduledTasks_Tick(object state) 
{ 
    ManualResetEvent resetEvent = null; 
    try 
    { 
     resetEvent = (ManualResetEvent)state; 
     //Block timer until GetScheduledTasks() finishes 
     _ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite); 
     GetScheduledTasks(); 
    } 
    finally 
    { 
     _ScheduledTasks.Change(5000, 5000); 
     Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff")); 
     resetEvent.Set(); 
    } 
} 


private void GetScheduledTask() 
{ 
    try 
    { 
     //Check to see if the database connection is still up 
     if (!_IsAlive) 
     { 
      //Handle 
      _ConnectionLostNotification = true; 
      return; 
     } 

     //Get scheduled records from the database 
     ISchedulerTask task = null; 

     using (DataTable dt = FastSql.ExecuteDataTable(
       _ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure, 
       new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class 
     { 
      if (dt != null) 
      { 
       if (dt.Rows.Count == 1) 
       { //Only 1 row is allowed 
        DataRow dr = dt.Rows[0]; 

        //Get task information 
        TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString()); 
        task = ScheduledTaskFactory.CreateScheduledTask(taskType); 

        task.Description = dr["Description"].ToString(); 
        task.IsEnabled = (bool)dr["IsEnabled"]; 
        task.IsProcessing = (bool)dr["IsProcessing"]; 
        task.IsManualLaunch = (bool)dr["IsManualLaunch"]; 
        task.ProcessMachineName = dr["ProcessMachineName"].ToString(); 
        task.NextRun = (DateTime)dr["NextRun"]; 
        task.PostProcessNotification = (bool)dr["NotifyPostProcess"]; 
        task.PreProcessNotification = (bool)dr["NotifyPreProcess"]; 
        task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString()); 
        task.SleepMinutes = (int)dr["SleepMinutes"]; 
        task.ScheduleId = (int)dr["ScheduleId"]; 
        task.CurrentRuns = (int)dr["CurrentRuns"]; 
        task.TotalRuns = (int)dr["TotalRuns"]; 

        SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task); 
        //Queue up task to worker thread and start 
        ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);  
       } 
      } 
     } 

    } 
    catch (Exception ex) 
    { 
     //Handle 
    } 
} 

private void ThreadProc(object taskObject) 
{ 
    SchedulerTask task = (SchedulerTask)taskObject; 
    ScheduledTaskEngine engine = null; 
    try 
    { 
     engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString); 
     engine.StartTask(task.Task);  
    } 
    catch (Exception ex) 
    { 
     //Handle 
    } 
    finally 
    { 
     task.TaskResetEvent.Set(); 
     task.TaskResetEvent.Dispose(); 
    } 
} 
+0

好像这是一个编码错误,与声明重置事件数组有关。我是做 'ManualResetEvent的[]事件=新ManualResetEvents [统计]' 而不是 '的WaitHandle [] =事件的WaitHandle新[计]' – 2010-07-19 18:16:01

回答

0

事实证明,这个奇怪问题的来源与体系结构无关,而是因为将解决方案从3.5转换为4.0。我重新创建了解决方案,不进行代码更改,并且问题再也没有发生过。

2

0x8007000E是存储器外的错误。这和处理计数似乎指向资源泄漏。确保你正在处理实现IDisposable的每个对象。这包括您正在使用的ManualResetEvent的阵列。

如果您有时间,您可能还想转换为使用.NET 4.0 Task类;它被设计来处理像这样更复杂的场景。通过定义子对象,可以减少总体线程数(线程非常昂贵,不仅因为调度,还因为它们的堆栈空间)。

0

我认为你应该重新考虑你的架构。您只能同时连接3个事实几乎要求您使用1个线程来生成文件列表和3个线程来处理它们。您的生产者线程会将所有文件插入到队列中,并且3个使用者线程将出队并继续处理,因为物品到达队列中。阻塞队列可以显着简化代码。如果您使用.NET 4.0,那么您可以利用BlockingCollection类。

public class Example 
{ 
    private BlockingCollection<string> m_Queue = new BlockingCollection<string>(); 

    public void Start() 
    { 
     var threads = new Thread[] 
      { 
       new Thread(Producer), 
       new Thread(Consumer), 
       new Thread(Consumer), 
       new Thread(Consumer) 
      }; 
     foreach (Thread thread in threads) 
     { 
      thread.Start(); 
     } 
    } 

    private void Producer() 
    { 
     while (true) 
     { 
      Thread.Sleep(TimeSpan.FromSeconds(5)); 
      ScheduledTask task = GetScheduledTask(); 
      if (task != null) 
      { 
       foreach (string file in task.Files) 
       { 
        m_Queue.Add(task); 
       } 
      } 
     } 
    } 

    private void Consumer() 
    { 
     // Make a connection to the resource that is assigned to this thread only. 
     while (true) 
     { 
      string file = m_Queue.Take(); 
      // Process the file. 
     } 
    } 
} 

我在上面的例子中绝对过分简化了一些东西,但我希望你能得到一般的想法。请注意,这是非常简单的,因为线程同步的方式并不多(大多数将嵌入到阻塞队列中),当然也不会使用对象。很明显,你将不得不添加正确的机制来优雅地关闭线程,但这应该相当容易。

1

我正在寻找类似问题的解答(处理计数随时间而增加)。

我看了一下您的应用程序架构和想建议你的东西,可以帮助你:

你听说过IOCP(输入输出完成端口)。

我不确定使用C#实现这一点,但在C/C++中它是一块蛋糕。 通过使用它可以创建一个唯一的线程池(该池中的线程数一般定义为2 x PC或服务器中处理器或处理器核的数量) 将此池与IOCP句柄关联并将池做这项工作。 查看这些功能的帮助: CreateIoCompletionPort(); PostQueuedCompletionStatus(); GetQueuedCompletionStatus();

一般情况下创建和退出线程可能非常耗时并导致性能损失和内存碎片。 有关于IOCP在MSDN和谷歌的成千上万的文献。