9

我正在尝试在发布/订阅模型中创建一个使用 ZeroMQ(通过 nuget 的 clrzmq .net 绑定(x86))的 C# Winform 应用程序。

经过大量搜索,我只能找到代码使用 while 语句无限期地处理新消息的独立 C# 示例。当我尝试使用这些示例时,我不知道将代码放在哪里,它只会阻止 gui 和其他所有内容。

我不知道不使用另一个线程是否不可能,但我的印象是 ZeroMQ 的异步行为可以在不编写额外线程的情况下工作。也许我只是不知道将 zeromq 代码放在哪里,或者我真的需要另一个线程。

如果有人可以提供一个简单的 pub/sub 示例,说明将代码实际插入到默认 C# winform 应用程序的位置,将不胜感激。

4

1 回答 1

7

我假设您在项目中使用clrzmq ZeroMq 包装器。据我所知,不可能使用 clrzmq 在简单的循环中接收非阻塞消息,它会无限期地阻塞一段特定的时间(通过向接收方法提供超时)或直到您收到一条消息.

但是,设置一个线程来定期轮询套接字并将传入消息推送到Queue. 然后,您可以使用例如一个简单的 WinFormsTimer定期将任何未决消息从该 (shared) 中取出Queue。这是一个线程订阅者的工作示例:

public class ZeroMqSubscriber
{
    private readonly ZmqContext _zmqContext;
    private readonly ZmqSocket _zmqSocket;
    private readonly Thread _workerThread;
    private readonly ManualResetEvent _stopEvent = new ManualResetEvent(false);
    private readonly object _locker = new object();
    private readonly Queue<string> _queue = new Queue<string>();

    public ZeroMqSubscriber(string endPoint)
    {
        _zmqContext = ZmqContext.Create();
        _zmqSocket = _zmqContext.CreateSocket(SocketType.SUB);
        _zmqSocket.Connect(endPoint);
        _zmqSocket.SubscribeAll();

        _workerThread = new Thread(ReceiveData);
        _workerThread.Start();
    }

    public string[] GetMessages()
    {
        lock (_locker)
        {
            var messages = _queue.ToArray();
            _queue.Clear();
            return messages;
        }
    }

    public void Stop()
    {
        _stopEvent.Set();
        _workerThread.Join();
    }

    private void ReceiveData()
    {
         try
         {
             while (!_stopEvent.WaitOne(0))
             {
                 var message = _zmqSocket.Receive(Encoding.UTF8, 
                               new TimeSpan(0, 0, 0, 1));
                 if (string.IsNullOrEmpty(message))
                     continue;

                 lock (_locker)
                     _queue.Enqueue(message);
             }
         }
         finally
         {
             _zmqSocket.Dispose();
             _zmqContext.Dispose();
         }
    }
}

Form您只需定期轮询队列(此示例使用 aForms Timer并将消息数据附加到 a Textbox):

private readonly ZeroMqSubscriber _zeroMqSubscriber = 
        new ZeroMqSubscriber("tcp://127.0.0.1:5000");

void ReceiveTimerTick(object sender, EventArgs e)
{
    var messages = _zeroMqSubscriber.GetMessages();
    foreach (var message in messages)
        _textbox.AppendText(message + Environment.NewLine);
}
于 2013-02-10T11:56:09.340 回答