2012-07-20 51 views
1

在WPF应用程序中,我有一个发布消息的第三方库。如何在单线程服务中正确分配多线程消息流?

的消息是这样的:

public class DialectMessage 
{ 
    public string PathAndQuery { get; private set; } 

    public byte[] Body { get; private set; } 

    public DialectMessage(string pathAndQuery, byte[] body) 
    { 
     this.PathAndQuery = pathAndQuery; 
     this.Body = body; 
    } 
} 

我设置从我app.cs文件外部信息源:

public partial class App : Application 
{ 
    static App() 
    { 
     MyComponent.MessageReceived += MessageReceived; 
     MyComponent.Start(); 
    } 

    private static void MessageReceived(Message message) 
    { 
     //handle message 
    } 

} 

这些信息可以从多个线程在同一时间进行发布,使可能一次多次调用事件处理程序。

我有一个服务对象,必须解析传入的消息。这个服务实现了以下接口:

internal interface IDialectService 
{ 
    void Parse(Message message); 
} 

而且我有一个默认的静态实例在我app.cs文件:

private readonly static IDialectService g_DialectService = new DialectService(); 

为了简化分析程序的代码,我想确保一次只能解析一条消息。

我也想避免锁定在我的事件处理程序,因为我不想阻止第三方对象。

因为这个要求,我不能直接调用从我的消息事件处理程序

什么是保证这个单线程执行正确的方法g_DialectService.Parse

我的第一个是将我的解析操作封装在Produce/Consumer模式中。为了达到这个目标,我已经尝试以下方法:

  1. 声明一个BlockingCollection在我app.cs:

    private readonly static BlockingCollection<Message> g_ParseOperations = new BlockingCollection<Message>(); 
    
  2. 更改我的事件处理程序的身体补充的操作:

    private static void MessageReceived(Message message) 
    { 
        g_ParseOperations.Add(message); 
    } 
    
  3. 创建从我的应用程序构造泵收集一个新的线程:

    static App() 
    { 
        MyComponent.MessageReceived += MessageReceived; 
        MyComponent.Start(); 
    
        Task.Factory.StartNew(() => 
        { 
         Message message; 
         while (g_ParseOperations.TryTake(out message)) 
         { 
          g_DialectService.Parse(message); 
         } 
        }); 
    } 
    

但是,这段代码似乎不起作用。从不调用服务分析方法。

此外,我不确定此模式是否允许我正确关闭应用程序。

我在代码中改变了什么以确保一切正常?

PS:我针对.NET 4.5

[编辑]一些搜索后,并the answer of ken2k,我可以看到我是在错误的地方采取的通话trytake。

我更新的代码现在是:

private readonly static CancellationTokenSource g_ShutdownToken = new CancellationTokenSource(); 

    private static void MessageReceived(Message message) 
    { 
     g_ParseOperations.Add(message, g_ShutdownToken.Token); 
    } 

    static App() 
    { 
     MyComponent.MessageReceived += MessageReceived; 
     MyComponent.Start(); 

     Task.Factory.StartNew(() => 
     { 
      while (!g_ShutdownToken.IsCancellationRequested) 
      { 
       var message = g_ParseOperations.Take(g_ShutdownToken.Token); 
       g_DialectService.Parse(message); 
      } 
     }); 
    } 

    protected override void OnExit(ExitEventArgs e) 
    { 
     g_ShutdownToken.Cancel(); 
     base.OnExit(e); 
    } 

此代码作为预期。消息按正确的顺序处理。但是,一旦我退出应用程序,就会在Take方法上得到一个“CancelledException”,即使我之前只是测试IsCancellationRequested。

+0

那么,你正在关闭应用程序 - 只是吃例外! – 2012-07-20 12:30:02

+0

..或更好,不要打扰取消线程。 – 2012-07-20 12:31:54

+0

这可能适用于我实际退出应用程序。但就我个人的知识而言,我想知道如何正确关机(想象一下启动/停止按钮)。 – 2012-07-20 12:33:23

回答

2

documentation说,大约BlockingCollection.TryTake(out T item)

如果集合是空的,此方法立即返回false。

所以基本上你的循环立即退出。你可能想要的是调用TryTake method用超时参数代替,并退出循环,当mustStop变量变为true

bool mustStop = false; // Must be set to true on somewhere else when you exit your program 
... 
while (!mustStop) 
{ 
    Message yourMessage; 

    // Waits 500ms if there's nothing in the collection. Avoid to consume 100% CPU 
    // for nothing in the while loop when the collection is empty. 
    if (yourCollection.TryTake(out yourMessage, 500)) 
    { 
     // Parses yourMessage here 
    } 
} 

为您编辑的问题:如果你的意思是你收到OperationCanceledException,这是好的,正是如何以CancellationToken对象作为参数的方法必须行为:)只需捕获异常并优雅地退出。

+0

发布后我看到了。让我更新我的问题,仍然存在一些问题。 – 2012-07-20 12:13:42

+0

@SteveB查看编辑答案 – ken2k 2012-07-20 12:33:19

+0

好的,我明白了。我不能简单地检查我的取消资格。取消要求。因为,在取消实际发生之前完成此检查。大部分时间,Take方法将处于等待状态。我已经添加了一个简单的try/catch(OperationCancelledException)并且工作正常。谢谢 – 2012-07-20 12:36:00