2015-11-14 98 views
0

这是我第一次尝试编写Windows服务。多线程Windows服务处理Windows Message Queue

此窗口服务必须处理2个窗口消息队列。

每个消息队列应该有自己的线程,但我似乎无法得到架构就地。

我跟着这个Windows Service to run constantly,它允许我创建一个线程,我正在处理一个队列。

所以这是我的服务类:

protected override void OnStart(string[] args) 
    { 
     _thread = new Thread(WorkerThreadFunc) { Name = "Address Calculator Thread", IsBackground = true }; 
     _thread.Start(); 
    } 

    private void WorkerThreadFunc() 
    { 
     _addressCalculator = new GACAddressCalculator(); 

     while (!_shutdownEvent.WaitOne(0)) 
     { 
      _addressCalculator.StartAddressCalculation(); 
     } 
    } 



    protected override void OnStop() 
    { 
     _shutdownEvent.Set(); 
     if (!_thread.Join(5000)) 
     { // give the thread 5 seconds to stop 
      _thread.Abort(); 
     } 
    } 

在我GACAddressCalculator.StartAddressCalculation()我创建一个队列处理器的对象,看起来像这样:

public void StartAddressCalculation() 
    { 
     try 
     { 
      var googleQueue = new GISGoogleQueue("VehMonLogGISGoogle", 1, _gacLogger, 1); 
      googleQueue.ProccessMessageQueue(); 

     } 
     catch (Exception ex) 
     { 

     } 

    } 

这是GISGoogleQueue

public class GISGoogleQueue : BaseMessageQueue 
{ 


    public GISGoogleQueue(string queueName, int threadCount, GACLogger logger, int messagesPerThread) 
     : base(queueName, threadCount, logger, messagesPerThread) 
    { 
    } 

    public override void ProccessMessageQueue() 
    { 
     if (!MessageQueue.Exists(base.QueueName)) 
     { 
      _logger.LogMessage(MessageType.Information, string.Format("Queue '{0}' doesn't exist", this.QueueName)); 
      return; 
     } 

     var messageQueue = new MessageQueue(QueueName); 
     var myVehMonLog = new VehMonLog(); 
     var o = new Object(); 
     var arrTypes = new Type[2]; 
     arrTypes[0] = myVehMonLog.GetType(); 
     arrTypes[1] = o.GetType(); 
     messageQueue.Formatter = new XmlMessageFormatter(arrTypes); 

     using (var pool = new Pool(ThreadCount)) 
     { 

      // Infinite loop to process all messages in Queue 
      for (; ;) 
      { 
       for (var i = 0; i < MessagesPerThread; i++) 
       { 
        try 
        { 
         while (pool.TaskCount() >= MessagesPerThread) ; // Stop execution until Tasks in pool have been executed 


         var message = messageQueue.Receive(new TimeSpan(0, 0, 5, 0)); // TimeOut for message reading from Queue, set to 5 minutes, Will throw exception after 5 mins 

         if (message != null) // Check if message is not Null 
         { 
          var monLog = (VehMonLog)message.Body; 
          pool.QueueTask(() => ProcessMessageFromQueue(monLog)); // Add to Tasks list in Pool 
         } 

        } 
        catch (Exception ex) 
        { 

        } 

       } 
      } 
     } 
    } 

}

现在这适用于1消息队列,但如果我想处理另一个消息队列它不会发生,因为我在ProccessMessageQueue方法中有一个无限循环。

我想在一个单独的线程中执行每个队列。

我认为我在WorkerThreadFunc()中犯了一个错误,我必须以某种方式从那里启动两个线程或在OnStart()

此外,如果您有任何提示如何改善此服务将是伟大的。

通过我使用的池类从这个答案https://stackoverflow.com/a/436552/1910735线程池里面ProccessMessageQueue

回答

1

我建议改变你的服务类,如下所示(下面的注释):

protected override void OnStart(string[] args) 
{ 
    _thread = new Thread(WorkerThreadFunc) 
       { 
        Name = "Run Constantly Thread", 
        IsBackground = true 
       }; 
    _thread.Start(); 
} 

GISGoogleQueue _googleQueue1; 
GISGoogleQueue _googleQueue2; 
private void WorkerThreadFunc() 
{ 
    // This thread is exclusively used to keep the service running. 
    // As such, there's no real need for a while loop here. Create 
    // the necessary objects, start them, wait for shutdown, and 
    // cleanup. 
    _googleQueue1 = new GISGoogleQueue(...); 
    _googleQueue1.Start(); 
    _googleQueue2 = new GISGoogleQueue(...); 
    _googleQueue2.Start(); 

    _shutdownEvent.WaitOne(); // infinite wait 

    _googleQueue1.Shutdown(); 
    _googleQueue2.Shutdown(); 
} 

protected override void OnStop() 
{ 
    _shutdownEvent.Set(); 
    if (!_thread.Join(5000)) 
    { 
     // give the thread 5 seconds to stop 
     _thread.Abort(); 
    } 
} 

我忽略了你的GACAddressCalculator。从你展示的内容来看,它似乎是围绕GISGoogleQueue的薄包装。显然,如果它实际上做了一些你没有显示的东西,那就需要重新考虑。

请注意,在WorkerThreadFunc()中创建了两个GISGoogleQueue对象。下面我们来看看如何创建这些对象来实现合适的线程模型。

public class GISGoogleQueue : BaseMessageQueue 
{ 
    System.Threading.Thread _thread; 
    System.Threading.ManualResetEvent _shutdownEvent; 

    public GISGoogleQueue(string queueName, int threadCount, GACLogger logger, int messagesPerThread) 
     : base(queueName, threadCount, logger, messagesPerThread) 
    { 
     // Let this class wrap a thread object. Create it here. 
     _thread = new Thread(RunMessageQueueFunc() 
        { 
         Name = "Run Message Queue Thread " + Guid.NewGuid().ToString(), 
         IsBackground = true 
        }; 
     _shutdownEvent = new ManualResetEvent(false); 
    } 

    public Start() 
    { 
     _thread.Start(); 
    } 

    public Shutdown() 
    { 
     _shutdownEvent.Set(); 
     if (!_thread.Join(5000)) 
     { 
      // give the thread 5 seconds to stop 
      _thread.Abort(); 
     } 
    } 

    private void RunMessageQueueFunc() 
    { 
     if (!MessageQueue.Exists(base.QueueName)) 
     { 
      _logger.LogMessage(MessageType.Information, string.Format("Queue '{0}' doesn't exist", this.QueueName)); 
      return; 
     } 

     var messageQueue = new MessageQueue(QueueName); 
     var myVehMonLog = new VehMonLog(); 
     var o = new Object(); 
     var arrTypes = new Type[2]; 
     arrTypes[0] = myVehMonLog.GetType(); 
     arrTypes[1] = o.GetType(); 
     messageQueue.Formatter = new XmlMessageFormatter(arrTypes); 

     using (var pool = new Pool(ThreadCount)) 
     { 
      // Here's where we'll wait for the shutdown event to occur. 
      while (!_shutdownEvent.WaitOne(0)) 
      { 
       for (var i = 0; i < MessagesPerThread; i++) 
       { 
        try 
        { 
         // Stop execution until Tasks in pool have been executed 
         while (pool.TaskCount() >= MessagesPerThread) ; 

         // TimeOut for message reading from Queue, set to 5 minutes, Will throw exception after 5 mins 
         var message = messageQueue.Receive(new TimeSpan(0, 0, 5, 0)); 

         if (message != null) // Check if message is not Null 
         { 
          var monLog = (VehMonLog)message.Body; 
          pool.QueueTask(() => ProcessMessageFromQueue(monLog)); // Add to Tasks list in Pool 
         } 
        } 
        catch (Exception ex) 
        { 
        } 
       } 
      } 
     } 
    } 
} 

这种方法中心周围使用Thread对象由GISGoogleQueue类缠绕。对于您创建的每个GISGoogleQueue对象,您将得到一个包裹线程,该线程将在GISGoogleQueue对象上调用Start()后执行该工作。

几点。在RunMessageQueueFunc()中,您检查是否存在队列的名称。如果没有,则退出该功能。IF出现这种情况,线程退出了。问题在于,您可能希望在此过程中尽早进行检查。只是一个想法。

其次,注意你的无限循环已取代对抗_shutdownEvent对象进行检查。这样,当服务关闭时,循环将停止。对于时效性,你需要确保一个完整的一次循环并不需要太长时间。否则,您可能会在关机后5秒钟中止线程。该中止只存在,以确保一切都拆了,但如果可能的话,应避免。

我知道很多人会更喜欢使用Task类来做这样的事情。看来,你是里面RunMessageQueueFunc()。但对于在这个过程中运行的线程,我认为Task类是错误的选择,因为它将线程池中的线程关联起来。对我来说,那Thread类是什么建立。

HTH

+1

感谢您的好评。我认为你在RunMessageQueueFunc()中有一个错字:while(!_shutdownEvent.Wait(0))我认为它应该是:!_shutdownEvent.WaitOne(0) –

+0

不客气。纠正了错误。 –

0

您可以使用Parallel.ForEach这样的方式;

Parallel.ForEach(queueItems, ProcessQueue); //this will process each queue item in a separate thread 


private void ProcessQueue(QueueItem queue) 
{ 
    //your processing logic  
}