19

我阅读了ZeroMq 指南,偶然发现了以下内容:

您不得在线程之间共享 ØMQ 套接字。ØMQ 套接字不是线程安全的。从技术上讲,这样做是可能的,但它需要信号量、锁或互斥体。这将使您的应用程序缓慢而脆弱。在线程之间共享套接字的唯一地方是在语言绑定中需要像套接字上的垃圾收集这样的魔法。

后来:

记住:不要使用或关闭套接字,除非在创建它们的线程中。

我也明白 ZeroMQContext是线程安全的。

如果一个类注册另一个类的事件,在 .Net 中,该事件可能会从与创建侦听器的线程不同的线程调用。

我认为只有两种选择可以通过 ZeroMQ-Sockets 从事件处理程序中分派某些东西:

  • 将 eventhandler-invoking-thread 同步到 ZeroMQ-Socket创建的线程
  • 使用线程安全的 ZeroMQ- 为事件处理程序中的线程创建一个新的 ZeroMQ- Socket/获取现有的 ZeroMQ-SocketContext

似乎 0MQ-Guide 不鼓励第一个,我不认为为每个线程创建一个新的 ZeroMq-Socket 是高性能的/要走的路。

我的问题
从事件处理程序中通过 0MQ 发布消息的正确模式(它的意思是)是什么?

此外,该指南的作者在编写时是否考虑到了 .Net 的 ZeroMQ 绑定:

在线程之间共享套接字的唯一地方是在语言绑定中需要像套接字上的垃圾收集这样的魔法。?

这是一些示例代码来强调我的问题/问题:

public class ExampleClass
{
    public event EventHandler<ByteEventArgs> SomethinIsCalledFromAnotherThread;
}

public class ByteEventArgs : EventArgs
{
    public byte[] BytesToSend;
}


public class Dispatcher
{
    ZMQ.Context ctx;

    public Dispatcher(ZMQ.Context mqcontext, ExampleClass exampleClassInstance)
    {
        this.ctx = mqcontext;
        exampleClassInstance.SomethinIsCalledFromAnotherThread += new EventHandler<ByteEventArgs>(exampleClass_SomethinIsCalledFromAnotherThread);
    }

    void exampleClass_SomethinIsCalledFromAnotherThread(object sender, ByteEventArgs e)
    {
        // this method might be called by a different thread. So I have to get a new socket etc?
        using (var socket = ctx.Socket(ZMQ.SocketType.PUSH))
        {
            // init socket etc..... and finally: 
            socket.Send(e.BytesToSend);
        }
        // isn't that too much overhead?
    }
}
4

3 回答 3

27

您可以创建许多 0MQ 套接字,当然与您拥有的线程一样多。如果您在一个线程中创建套接字,并在另一个线程中使用它,则必须在两个操作之间执行完整的内存屏障。其他任何事情都会导致 libzmq 出现奇怪的随机故障,因为套接字对象不是线程安全的。

有一些常规模式,但我不知道这些模式如何专门映射到 .NET:

  1. 在使用它们的线程中创建套接字,期间。在紧密绑定到一个进程中的线程之间共享上下文,并在未紧密绑定的线程中创建单独的内容。在高级 C API (czmq) 中,这些被称为附加和分离线程。
  2. 在父线程中创建一个套接字,并在线程创建时传递给附加线程。线程创建调用将执行完整的内存屏障。从那时起,只能在子线程中使用套接字。“使用”是指recv、send、setsockopt、getsockopt 和close。
  3. 在一个线程中创建一个套接字,并在另一个线程中使用,在每次使用之间执行您自己的完整内存屏障。这是非常微妙的,如果你不知道什么是“完整的内存屏障”,你不应该这样做。
于 2011-04-30T14:05:01.623 回答
7

在 .net framework v4 及更高版本中,您可以使用并发收集来解决此问题。即生产者-消费者模式。多个线程(处理程序)可以将数据发送到线程安全队列,并且只有单个线程使用队列中的数据并使用套接字发送数据。

这是想法:

sendQueue = new BlockingCollection<MyStuff>(new ConcurrentQueue<MyStuff>());
// concurrent queue can accept from multiple threads/handlers safely
MyHandler += (MyStuff stuffToSend) => sendQueue.Add(stuffToSend);

// start single-threaded data send loop
Task.Factory.StartNew(() => {
    using(var socket = context.Socket()) {
        MyStuff stuffToSend;
        // this enumerable will be blocking until CompleteAdding is called
        foreach(var stuff in sendQueue.GetConsumingEnumerable())
            socket.Send(stuff.Serialize());
    }
});

// break out of the send loop when done
OnMyAppExit += sendQueue.CompleteAdding;
于 2012-11-30T20:58:42.317 回答
3

不要忘记查看 inproc 传输。使用 inproc:// 套接字进行线程间通信并有一个线程打开套接字以与其他进程/服务器通信可能很有用。

您仍然需要每个线程至少一个套接字,但 inproc 根本不涉及 IP 网络层。

于 2011-07-01T01:47:30.310 回答