1

在 WPF 应用程序中,我有一个正在发布消息的第三方库。

消息如下:

public class DialectMessage
{
    public string PathAndQuery { get; private set; }

    public byte[] Body { get; private set; }

    public DialectMessage(string pathAndQuery, byte[] body)
    {
        this.PathAndQuery = pathAndQuery;
        this.Body = body;
    }
}

我从我的 app.cs 文件中设置了外部消息源:

public partial class App : Application
{
    static App()
    {
        MyComponent.MessageReceived += MessageReceived;
        MyComponent.Start();
    }

    private static void MessageReceived(Message message)
    {
        //handle message
    }

}

这些消息可以一次从多个线程发布,从而可以一次多次调用事件处理程序。

我有一个必须解析传入消息的服务对象。该服务实现以下接口:

internal interface IDialectService
{
    void Parse(Message message);
}

我的 app.cs 文件中有一个默认的静态实例:

    private readonly static IDialectService g_DialectService = new DialectService();

为了简化解析器的代码,我想确保一次只解析一条消息。

我还想避免锁定我的事件处理程序,因为我不想阻止第 3 方对象。

由于这个要求,我不能直接g_DialectService.Parse从我的消息事件处理程序调用

确保这种单线程执行的正确方法是什么?

我的第一个想法是将我的解析操作包装在生产/消费者模式中。为了达到这个目标,我尝试了以下方法:

  1. 在我的 app.cs 中声明一个 BlockingCollection:

    private readonly static BlockingCollection<Message> g_ParseOperations = new BlockingCollection<Message>();
    
  2. 更改我的事件处理程序的主体以添加操作:

    private static void MessageReceived(Message message)
    {
        g_ParseOperations.Add(message);
    }
    
  3. 创建一个新线程,从我的应用构造函数中抽取集合:

    static App()
    {
        MyComponent.MessageReceived += MessageReceived;
        MyComponent.Start();
    
        Task.Factory.StartNew(() =>
        {
            Message message;
            while (g_ParseOperations.TryTake(out message))
            {
                g_DialectService.Parse(message);
            }
        });
    }
    

但是,此代码似乎不起作用。服务 Parse 方法永远不会被调用。

此外,我不确定这种模式是否允许我正确关闭应用程序。

我需要对代码进行哪些更改以确保一切正常?

PS:我的目标是.Net 4.5

[编辑] 经过一番搜索,以及ken2k 的答案,我可以看到我错误地调用 trytake 代替 take

我的更新代码现在是:

    private readonly static CancellationTokenSource g_ShutdownToken = new CancellationTokenSource();

    private static void MessageReceived(Message message)
    {
        g_ParseOperations.Add(message, g_ShutdownToken.Token);
    }

    static App()
    {
        MyComponent.MessageReceived += MessageReceived;
        MyComponent.Start();

        Task.Factory.StartNew(() =>
        {
            while (!g_ShutdownToken.IsCancellationRequested)
            {
                var message = g_ParseOperations.Take(g_ShutdownToken.Token);
                g_DialectService.Parse(message);
            }
        });
    }

    protected override void OnExit(ExitEventArgs e)
    {
        g_ShutdownToken.Cancel();
        base.OnExit(e);
    }

此代码按预期运行。消息以正确的顺序处理。但是,一旦我退出应用程序,我就会在 Take 方法上得到一个“CancelledException”,即使我之前只是测试了 IsCancellationRequested。

4

1 回答 1

2

文档BlockingCollection.TryTake(out T item)

如果集合为空,则此方法立即返回 false。

所以基本上你的循环会立即退出。您可能想要的是使用超时参数调用TryTake 方法mustStop,并在变量变为时退出循环true

bool mustStop = false;  // Must be set to true on somewhere else when you exit your program
...
while (!mustStop)
{
    Message yourMessage;

    // Waits 500ms if there's nothing in the collection. Avoid to consume 100% CPU
    // for nothing in the while loop when the collection is empty.
    if (yourCollection.TryTake(out yourMessage, 500))
    {
        // Parses yourMessage here
    }
}

对于您编辑的问题:如果您的意思是您收到了OperationCanceledException,那没关系,这正是将CancellationToken对象作为参数的方法必须表现的方式:) 只需捕获异常并优雅地退出。

于 2012-07-20T12:10:35.420 回答