1

我已阅读:Nanomsg 多播带宽问题

但我不需要真正的多播 IP(例如 239.0.0.0:3000) 我的负载也很轻。所以我并不担心背压。

是的,我可以使用总线范式。但是假设我想先用 pubsub 进行测试。

发送者将什么用作发送到 tcp 的发送到 url 以发送到多个客户端?

(我实际上正在使用下一代 nanomsg):

https://nanomsg.github.io/nng/man/v1.0.0/nng_tcp.7.html

我可以发送到 tcp://*:3000

我可以将订阅者绑定到该地址吗?

4

2 回答 2

1

我从 gdamore 获得的信息似乎表明不能使用 tcp://*:3000 因为您需要将生产者/发布者绑定到某个接口。但是,一个固定的端点我现在有 1-1 个发布者到订阅者通过 tcp 工作。

(这在:https ://www.freelists.org/post/nanomsg/does-nanomsg-support-multi-producer-in-pubsub-mode,10 中讨论过)

既然我看到多播实际上是不可能的(直到允许 UDP 传输),我已经修改了我的问题并将其放在我的最终解决方案的上下文中。看:

nanomsg (nng) 中的多个发布者和订阅者

于 2018-06-24T15:45:36.203 回答
0

我可以寄到tcp://*:3000吗?

是的。

我可以将订阅者绑定到该地址吗?

是的。当然,一个人必须拥有自己的集合(显然不能“共享”任何专有资源)
但是,
必须使用正确配置的-s ( -s )PUB来“编织” ,以便覆盖所有那些远程访问点,它不需要自己知道。.connect()nng_dial()

使用反向的自由.bind()/.connect()是 ZeroMQ 和 nanomsg 的常见做法,但请检查是否保持这种选择自由。

...想先用 pubsub 进行测试。发送者将使用什么作为发送到 tcp发送到多个客户端的 URL?

好吧,如果刚从 ZeroMQ / nanomsg / nng 开始,事情似乎有点令人困惑。

首先:
初始 API v2.x 和不会只发送给多个客户端,而是始终发送给Socket 原型引擎可见的所有客户端(可能的配置调整细节超出了本文的范围)。

换句话说,PUB-side 应用程序代码根本不决定谁将收到消息。一旦看到所有客户端都在“交付列表”上,网络将不得不将所有副本传送到所有此类客户端,这些客户端能够进行临时连接(有些更快,有些晚,其余的人再也看不见了是一个单独的音乐会 - 也不是您的问题,如您之前的问题中所述)。

订阅机制是这里的PUB/SUB一个重要问题。和早期的zeromq v2.x 都执行的订阅 TOPIC 过滤,并且它实际上发生在每条消息被传递到远程客户端时,以远程 {PASS|FAIL} 拒绝为代价,基于该远程客户端订阅的主题。因此,所有消息都按设计流向所有客户端。

这意味着,您的网络级通信多播实际上对这种类型的本地(仅)网络没有任何好处(如您在上一个问题中发布的那样)PUB/SUB可扩展的正式通信模式原型。还没有发布来支持这种传输类的实现(而有)。

下一篇:
看来,对[ ZeroMQ 层次结构在不到五秒的时间内] 部分的主要概念差异进行简短阅读可能有助于消除乐高式基础设施设置的术语和概念:

代码显然是示意性的,不完整的,但可以阅读和理解:

// PUB_sender( "<tcp4>://<A.B.C.D>:<PORT>" ); // will launch it
//              <tcp6>://[::1]:<PORT>
//          <tls+tcp4>://<___aTransportClass_specific_AccessPoint_ADDRESS_>:<_aTransportClass_spcific_AccessPoint_SPECIFIER>
//
// -------------------------------------------------------------------
// SUB_client( "..." ); // anyone ( all ) who will "dial-in" will receive messages

#define PUT64( ptr, u )                              \
    do {                                             \
        (ptr)[0] = (uint8_t)(((uint64_t)(u)) >> 56); \
        (ptr)[1] = (uint8_t)(((uint64_t)(u)) >> 48); \
        (ptr)[2] = (uint8_t)(((uint64_t)(u)) >> 40); \
        (ptr)[3] = (uint8_t)(((uint64_t)(u)) >> 32); \
        (ptr)[4] = (uint8_t)(((uint64_t)(u)) >> 24); \
        (ptr)[5] = (uint8_t)(((uint64_t)(u)) >> 16); \
        (ptr)[6] = (uint8_t)(((uint64_t)(u)) >> 8);  \
        (ptr)[7] = (uint8_t)((uint64_t)(u));         \
    } while (0)
int
PUB_sender( const char *aTransportClassAccessPointURL )
{
   int        aRetVAL = -1;
   nng_socket aSOCKET = NNG_SOCKET_INITIALIZER;
// ---------------------------------aSOCKET = Context().socket( PUB );
   if ( ( aRetVAL = nng_pub0_open( &aSOCKET )
          ) != 0 )                  fatal( "nng_pub0_open",
                                            aRetVAL 
                                            );

// ---------------------------------aSOCKET.bind( aTransportClassAccessPointURL );
   if ( ( aRetVAL = nng_listen(     aSOCKET,
                                    aTransportClassAccessPointURL,
                                    NULL,
                                    NNG_FLAG_NONBLOCK
                                    )
          ) != 0 )                  fatal( "nng_listen",
                                            aRetVAL
                                            );

// ---------------------------------aSOCKET.setsockopt( LINGER, 0 );
   if ( ( aRetVAL = nng_setopt_int( aSOCKET,
                                    NNG_OPT_LINGER,
                                    0
                                    )
          ) != 0 )                  fatal( "nng_setopt_int",
                                            aRetVAL
                                            );
   for (;;)
   {
       char *   aBUF = "12345678";
       uint64_t aVAL;
       time_t   aNOW;

       aNOW = time( &aNOW ); printf( "PUB.send()-s: "); showdate( aNOW );

       PUT64( aBUF, (uint64_t) aNOW );

   // ----------------------------aSOCKET.send( aBUF, ... );
      if ( ( aRetVAL = nng_send(  aSOCKET,
                                  aBUF,
                                  sizeof( aBUF ),
                                  NNG_FLAG_ALLOC
                                  )
             ) != 0 )             fatal( "nng_send",
                                          aRetVAL
                                          );
   }
   nng_free(  aBUF, 8 );
   nng_close( aSOCKET );
}
于 2018-06-20T16:55:34.540 回答