18

我有两个应用程序,一个 C++ 服务器和一个 C# WPF UI。C++ 代码通过 ZeroMQ 消息传递 [PUB/SUB] 服务接受请求(来自任何地方/任何人)。我使用我的 C# 代码进行回溯测试并创建“回溯测试”并执行它们。这些回溯测试可以由许多“单元测试”组成,每个测试都从 C++ 服务器发送/接收数千条消息。

目前,单个回溯测试运行良好,可以发送 N 个单元测试,每个测试包含数千个请求和捕获。我的问题是建筑;当我发送另一个回测(在第一个测试之后)时,由于轮询线程没有被取消和处置,我遇到了第二次完成事件订阅的问题。这会导致错误的输出。这似乎是一个微不足道的问题(也许对你们中的一些人来说),但是在我当前的配置下取消这个轮询任务是很麻烦的。一些代码...

我的消息代理类很简单,看起来像

public class MessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
    private Task pollingTask;
    private NetMQContext context;
    private PublisherSocket pubSocket;

    private CancellationTokenSource source;
    private CancellationToken token;
    private ManualResetEvent pollerCancelled;

    public MessageBroker()
    {
        this.source = new CancellationTokenSource();
        this.token = source.Token;

        StartPolling();
        context = NetMQContext.Create();
        pubSocket = context.CreatePublisherSocket();
        pubSocket.Connect(PublisherAddress);
    }

    public void Dispatch(Taurus.FeedMux message)
    {
        pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
    }

    private void StartPolling()
    {
        pollerCancelled = new ManualResetEvent(false);
        pollingTask = Task.Run(() =>
        {
            try
            {
                using (var context = NetMQContext.Create())
                using (var subSocket = context.CreateSubscriberSocket())
                {
                    byte[] buffer = null;
                    subSocket.Options.ReceiveHighWatermark = 1000;
                    subSocket.Connect(SubscriberAddress);
                    subSocket.Subscribe(String.Empty);
                    while (true)
                    {
                        buffer = subSocket.Receive();
                        MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
                        if (this.token.IsCancellationRequested)
                            this.token.ThrowIfCancellationRequested();
                    }
                }
            }
            catch (OperationCanceledException)
            {
                pollerCancelled.Set();
            }
        }, this.token);
    }

    private void CancelPolling()
    {
        source.Cancel();
        pollerCancelled.WaitOne();
        pollerCancelled.Close();
    }

    public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }
    public string PublisherAddress { get { return "tcp://127.X.X.X:6500"; } }
    public string SubscriberAddress { get { return "tcp://127.X.X.X:6501"; } }

    private bool disposed = false;

    protected virtual void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                if (this.pollingTask != null)
                {
                    CancelPolling();
                    if (this.pollingTask.Status == TaskStatus.RanToCompletion ||
                         this.pollingTask.Status == TaskStatus.Faulted ||
                         this.pollingTask.Status == TaskStatus.Canceled)
                    {
                        this.pollingTask.Dispose();
                        this.pollingTask = null;
                    }
                }
                if (this.context != null)
                {
                    this.context.Dispose();
                    this.context = null;
                }
                if (this.pubSocket != null)
                {
                    this.pubSocket.Dispose();
                    this.pubSocket = null;
                }
                if (this.source != null)
                {
                  this.source.Dispose();
                  this.source = null;
                }
            }
            disposed = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    ~MessageBroker()
    {
        Dispose(false);
    }
}

回测“引擎”用于执行每个回测,首先构造一个Dictionary包含每个Test(单元测试)和消息以分派给每个测试的 C++ 应用程序。

DispatchTests方法,来了

private void DispatchTests(ConcurrentDictionary<Test, List<Taurus.FeedMux>> feedMuxCollection)
{
    broker = new MessageBroker();
    broker.MessageRecieved = new Progress<Taurus.FeedMux>(OnMessageRecieved);
    testCompleted = new ManualResetEvent(false);

    try
    {
        // Loop through the tests. 
        foreach (var kvp in feedMuxCollection)
        {
            testCompleted.Reset();
            Test t = kvp.Key;
            t.Bets = new List<Taurus.Bet>();
            foreach (Taurus.FeedMux mux in kvp.Value)
            {
                token.ThrowIfCancellationRequested();
                broker.Dispatch(mux);
            }
            broker.Dispatch(new Taurus.FeedMux()
            {
                type = Taurus.FeedMux.Type.PING,
                ping = new Taurus.Ping() { event_id = t.EventID }
            });
            testCompleted.WaitOne(); // Wait until all messages are received for this test. 
        }
        testCompleted.Close();
    }
    finally
    {
        broker.Dispose(); // Dispose the broker.
    }
}

最后的PING消息,它告诉 C++ 我们已经完成了。然后我们强制等待,以便在从 C++ 代码接收到所有返回之前不会调度下一个 [unit] 测试 - 我们使用ManualResetEvent.

当 C++ 收到 PING 消息时,它会直接返回消息。我们通过处理接收到的消息OnMessageRecieved,PING 告诉我们设置,ManualResetEvent.Set()以便我们可以继续单元测试;“下一位”...

private async void OnMessageRecieved(Taurus.FeedMux mux)
{
    string errorMsg = String.Empty;
    if (mux.type == Taurus.FeedMux.Type.MSG)
    {
        // Do stuff.
    }
    else if (mux.type == Taurus.FeedMux.Type.PING)
    {
        // Do stuff.

        // We are finished reciving messages for this "unit test"
        testCompleted.Set(); 
    }
}

我的问题是,broker.Dispose()在 finally 上面永远不会被击中。我很欣赏在后台线程上执行的 finally 块不能保证被执行

上面划掉的文字是因为我弄乱了代码;在孩子完成之前,我正在停止父线程。但是,仍然存在问题...

现在broker.Dispose()被正确调用,并且broker.Dispose()被调用,在这个方法中我尝试取消轮询线程并Task正确处理以避免任何多个订阅。

要取消线程,我使用该CancelPolling()方法

private void CancelPolling()
{
    source.Cancel();
    pollerCancelled.WaitOne(); <- Blocks here waiting for cancellation.
    pollerCancelled.Close();
}

但在StartPolling()方法中

while (true)
{
    buffer = subSocket.Receive();
    MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
    if (this.token.IsCancellationRequested)
        this.token.ThrowIfCancellationRequested();
}

ThrowIfCancellationRequested()永远不会被调用并且线程永远不会被取消,因此永远不会被正确处理。轮询线程被该subSocket.Receive()方法阻塞。

现在,我不清楚如何实现我想要的,我需要在用于轮询消息之外的线程上调用broker.Dispose()/以及如何强制取消。PollerCancel()线程中止不是我想不惜一切代价进入的。

本质上,我想broker在执行下一个回溯测试之前正确处理,如何正确处理这个问题,拆分轮询并在单独的应用程序域中运行它?

我已经尝试过,在处理程序中进行OnMessageRecived处理,但这显然是在与轮询器相同的线程上执行的,并且不是这样做的方法,没有调用其他线程,它会阻塞。

实现我想要的最好方法是什么,我可以遵循这种情况的模式吗?

谢谢你的时间。

4

2 回答 2

2

这就是我最终解决这个问题的方法[尽管我愿意接受更好的解决方案!]

public class FeedMuxMessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
    // Vars.
    private NetMQContext context;
    private PublisherSocket pubSocket;
    private Poller poller;

    private CancellationTokenSource source;
    private CancellationToken token;
    private ManualResetEvent pollerCancelled;

    /// <summary>
    /// Default ctor.
    /// </summary>
    public FeedMuxMessageBroker()
    {
        context = NetMQContext.Create();

        pubSocket = context.CreatePublisherSocket();
        pubSocket.Connect(PublisherAddress);

        pollerCancelled = new ManualResetEvent(false);
        source = new CancellationTokenSource();
        token = source.Token;
        StartPolling();
    }

    #region Methods.
    /// <summary>
    /// Send the mux message to listners.
    /// </summary>
    /// <param name="message">The message to dispatch.</param>
    public void Dispatch(Taurus.FeedMux message)
    {
        pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
    }

    /// <summary>
    /// Start polling for messages.
    /// </summary>
    private void StartPolling()
    {
        Task.Run(() =>
            {
                using (var subSocket = context.CreateSubscriberSocket())
                {
                    byte[] buffer = null;
                    subSocket.Options.ReceiveHighWatermark = 1000;
                    subSocket.Connect(SubscriberAddress);
                    subSocket.Subscribe(String.Empty);
                    subSocket.ReceiveReady += (s, a) =>
                    {
                        buffer = subSocket.Receive();
                        if (MessageRecieved != null)
                            MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
                    };

                    // Poll.
                    poller = new Poller();
                    poller.AddSocket(subSocket);
                    poller.PollTillCancelled();
                    token.ThrowIfCancellationRequested();
                }
            }, token).ContinueWith(ant => 
                {
                    pollerCancelled.Set();
                }, TaskContinuationOptions.OnlyOnCanceled);
    }

    /// <summary>
    /// Cancel polling to allow the broker to be disposed.
    /// </summary>
    private void CancelPolling()
    {
        source.Cancel();
        poller.Cancel();

        pollerCancelled.WaitOne();
        pollerCancelled.Close();
    }
    #endregion // Methods.

    #region Properties.
    /// <summary>
    /// Event that is raised when a message is recived. 
    /// </summary>
    public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }

    /// <summary>
    /// The address to use for the publisher socket.
    /// </summary>
    public string PublisherAddress { get { return "tcp://127.0.0.1:6500"; } }

    /// <summary>
    /// The address to use for the subscriber socket.
    /// </summary>
    public string SubscriberAddress { get { return "tcp://127.0.0.1:6501"; } }
    #endregion // Properties.

    #region IDisposable Members.
    private bool disposed = false;

    /// <summary>
    /// Dispose managed resources.
    /// </summary>
    /// <param name="disposing">Is desposing.</param>
    protected virtual void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                CancelPolling();
                if (pubSocket != null)
                {
                    pubSocket.Disconnect(PublisherAddress);
                    pubSocket.Dispose();
                    pubSocket = null;
                }
                if (poller != null)
                {
                    poller.Dispose();
                    poller = null;
                }
                if (context != null)
                {
                    context.Terminate();
                    context.Dispose();
                    context = null;
                }
                if (source != null)
                {
                    source.Dispose();
                    source = null;
                }
            }

            // Shared cleanup logic.
            disposed = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    /// <summary>
    /// Finalizer.
    /// </summary>
    ~FeedMuxMessageBroker()
    {
        Dispose(false);
    }
    #endregion // IDisposable Members.
}

所以我们以同样的方式轮询,但使用Poller来自 NetMQ 的类。在我们设置的任务延续中,我们确定PollerTask都被取消了。然后我们可以安全地处理...

于 2015-05-05T09:22:05.170 回答
1

对主题的更高层次的看法

您致力于创建测试框架的专注和努力表明您的意愿旨在开发一种严格和专业级的方法,这使我首先举起帽子,对如此勇敢的事业表示钦佩。

虽然测试是提供合理定量证据的一项重要活动,即被测系统正在满足定义的期望,但这方面的成功取决于测试环境与实际部署条件的接近程度。

人们可能会同意,在另一个不同的基础上进行测试并不能证明实际部署将在与测试环境主要不同的环境中按预期运行。


元素控制或状态控制,这就是问题所在。

您的努力(至少在发布 OP 时)集中在代码架构上,它试图将实例保持在原位并尝试在下一个测试电池开始之前重新设置 Poller 实例的内部状态。

在我看来,测试有几个原则要遵循,你是否应该争取专业的测试:

  • 测试可重复性原则(测试的重新运行应提供相同的结果,从而避免仅提供结果的准测试 - “彩票”)

  • 非干预测试原则(测试的重新运行不应受到“外部”干扰,不受测试场景控制)

话虽如此,让我带来一些灵感来自于诺贝尔奖获得者 Harry Markowitz 的笔记,他因出色的量化投资组合优化研究而获奖。

而是后退一步来控制元素的整个生命周期

CACI Simulations, Inc.(Harry Markowitz 的公司之一)在 90 年代初开发了他们的旗舰软件框架 COMET III - 一个功能异常强大的仿真引擎,用于在大规模计算中运行的大型复杂设计原型和性能仿真/网络/电信网络。

COMET III 给人的最大印象是它能够生成测试场景,包括可配置的预测试“热身”预加载,这使得被测元件进入类似于机械中“疲劳”含义的状态酷刑试验或氢扩散脆性对核电站冶金学家意味着什么。

是的,一旦您深入了解算法、节点缓冲区、内存分配、管道/负载平衡/网格处理架构选择、故障恢复开销、垃圾收集策略和有限资源共享算法的低级细节工作和影响(在实际使用的工作负载模式“压力”下)端到端的性能/延迟,这个特性是必不可少的。

这意味着,单个与实例相关的简单状态控制是不够的,因为它不提供测试可重复性和测试隔离/非干预行为的手段。简而言之,即使您找到“重置” Poller 实例的方法,这也不会让您进入具有保证测试可重复性和可能的​​预测试热身的实际测试。

需要退一步和更高层的抽象和测试场景控制。

这如何适用于 OP 问题?

  • 而不仅仅是状态控制
  • 创建多层架构/控制平面/单独的信令

支持此目标的 ZeroMQ 方式

  • 将超结构创建为非平凡的模式
  • 使用测试场景中使用的实例的完整生命周期控制
  • 保持 ZeroMQ 格言:零共享、零阻塞、...
  • 受益于 Multi-Context()
于 2015-05-01T10:15:00.240 回答