1

我认为这个问题真的说明了一切。

对于一些背景知识,我有一个订阅者,我正在尝试为其编写一些测试。为了做到这一点,我从测试中启动了一个发布者,指定tcp://localhost:[port]为地址。发送消息时,订阅者不会收到它。下面是一些示例代码来演示:

   string address   =    "tcp://localhost:1026";
// string address   = "inproc://localhost1026";

   var    pubSocket =  new PublisherSocket();
   pubSocket.Bind(address);

   var subSocket = new SubscriberSocket();
   subSocket.Connect(address);
   subSocket.SubscribeToAnyTopic();

   pubSocket.SendFrame("Hello world!", false);

   Console.WriteLine(subSocket.ReceiveFrameString()); /* <-- tcp transport 
                                                         waits here forever
                                                         */
   subSocket.Dispose();
   pubSocket.Dispose();

如果我将协议更改为,inproc://那么一切都很好。但是,我不想在我的测试中这样做,因为我还想测试一个监视器套接字,这不会引发inproc://连接事件(据我所知)。

请注意,我在 C# 代码中使用 NetMQ(在 .NET Framework 4.6.2 下运行)。

4

2 回答 2

3

ZeroMQ (NetMQ) TCP 传输可以在同一进程中的发布者和订阅者之间使用吗?

当然,
目前还没有任何限制可以阻止您这样做。. .

您的观察与隐藏处理的延迟有关,它发生在主引擎内部......在 ZeroMQContext()实例内部。

事情不会发生在零时间

好吧,人们可能会选择在 Pieter HINTJENS 的《ZeroMQ Zen-of-Zero上的“ Code Connected, Volume 1 ”》之后推迟这一期。这两者都有意义,很有意义,不仅在这里)。

因此,虽然inproc://传输类具有(几乎)零资源,但它是一个纯粹的私有“进程内”内存映射抽象(当然,除了一些[ns]“设备”,如信号量/锁), ZeroMQ 基础设施以“立即”的方式启动并运行,tcp://它没有这种舒适性,因为它必须首先使用 O/S 生成所有传输类特定的合同,使用设备驱动程序,实例化传输- 类特定 ISO/OSI-{ L0, L1, L3, L3+ }-处理策略,将相应的数据泵代码实例化到Context()RTO 状态,分配和映射内存缓冲区用于这些目的,所以很多之前要做的工作PUB-side 进入 RTO 状态,它(在 API 4.+ 的较新版本下)也有责任接收SUB处理订阅服务遥测,因为它承担每个客户端 TOPIC-的集中责任过滤列表处理。

这就是为什么它导致挂.recv( ..., ZMQ_BLOCK )在 NetMQ 包装抽象内部的subSocket.ReceiveFrameString().

要对其进行测试,只需进行修改后的测试:

  // --------------------------------------------------- // DEMO PSEUDO-CODE
     string rF = "";

     while True:
         
           pubSocket.SendFrame( "Hello world!", false ); // keep sending ...
                                                         // also may count++
                                                         // so as to "show" how
                                                         // many loops it took
           rF = subSocket.ReceiveFrameString(   false ); // non-blocking mode ~
                                                         // .recv(  ZMQ_NOBLOCK )
                                                         // or may use Poll()
                                                         // to just sniff for
                                                         // a message presence
           if  rF == "":
               continue; // ---^ LOOP NEXT, AS DID .recv() NOTHING YET
           break; // ----------v BREAK      AS DID .recv() MESSAGE
// ----------------------------------------------------------------------
   Console.WriteLine( rF );

可以在这里进行更多的努力和实验,以查看.connect()相关开销的关键作用,以及在设置订阅时不要错过SUB-side 信号遥测 + 接收它的需要 + 在PUB-side,在任何消息首先被发送到预期之前,否则只是“永远”等待,SUB


.sleep( someGuestimateTIME )@HesamFaridmehr已经提出了一个足够“胖”的建议,在-sideSUB既有.connect()-ed 加上它之后.setsockopt( ZMQ_SUBSCRIBE )(必须首先交付并在 -side 上以适当的方式处理PUB(以配置 TOPIC-filter 列表处理器处理器正确))在第一个之前的所有足够好PUB.send()将通过使其“间接”阻塞并停止代码执行流程来掩盖根本原因,而不是使解决方案足够智能- 使用非阻塞形式Poll()for示例 - 对于专业的设计最佳实践,其中一个确实可以但服从程序集黑客钟爱的第一行存在宏#ASSUME NOTHING;

于 2018-05-23T20:10:28.473 回答
2

因为pub/sub模式就像收音机。发布者不会等待订阅者连接,如果没有订阅者,它将忽略发送。您可以通过添加Thread.Sleep(1000);后行来测试它subSocket.SubscribeToAnyTopic();,您将看到您将收到该消息。

inproc,绑定的时间少于tcp这就是您收到消息的原因

并且inproc,发布者应该在订阅者连接之前启动

于 2018-05-23T14:08:45.270 回答