6

我正在使用 .NET 中的 ZeroMQ,但在尝试解决一个奇怪的问题时遇到了困难。我有一个 PUSH 类型的套接字和一个 PULL over TCP 类型的套接字。当客户端断开连接时,服务器仍然能够发送一条消息(请注意,没有标志传递给 Socket.Send 方法),在开始阻塞并等待客户端重新连接并传递我尝试的消息之前,它会变得完全很多之后发送。

我怎样才能避免丢失消息(或者在最坏的情况下测试客户端是否已连接并且如果没有发送我可以承受丢失的虚拟消息)?

提前致谢!

编辑:进一步的测试表明,如果我在客户端断开连接后发送第一条消息后等待 1 秒,第二条消息将阻塞,但如果我根本不等待,我可以发送尽可能多的消息,他们会都迷路了。这很令人困惑...

4

1 回答 1

3

ZeroMQ 文档指出,这是 PUSH/PULL 设置的问题,并建议采用以下模式:当您期望订阅者数量固定时,添加 REP/REQ 设置以提供节点协调。但是,如果您无法提前知道订阅者的数量,您应该考虑更改您的协议以更好地适应这些条件。

C 中的同步发布者(来自 ZGuide)

//
//  Synchronized publisher
//
#include "zhelpers.h"

//  We wait for 10 subscribers
#define SUBSCRIBERS_EXPECTED  10

int main (void) 
{
    s_version_assert (2, 1);
    void *context = zmq_init (1);

    //  Socket to talk to clients
    void *publisher = zmq_socket (context, ZMQ_PUB);
    zmq_bind (publisher, "tcp://*:5561");

    //  Socket to receive signals
    void *syncservice = zmq_socket (context, ZMQ_REP);
    zmq_bind (syncservice, "tcp://*:5562");

    //  Get synchronization from subscribers
    int subscribers = 0;
    while (subscribers < SUBSCRIBERS_EXPECTED) {
        //  - wait for synchronization request
        char *string = s_recv (syncservice);
        free (string);
        //  - send synchronization reply
        s_send (syncservice, "");
        subscribers++;
    }
    //  Now broadcast exactly 1M updates followed by END
    int update_nbr;
    for (update_nbr = 0; update_nbr < 1000000; update_nbr++)
        s_send (publisher, "Rhubarb");

    s_send (publisher, "END");

    zmq_close (publisher);
    zmq_close (syncservice);
    zmq_term (context);
    return 0;
}
于 2011-04-04T13:59:18.103 回答