1

考虑以下:

  • 一组 3 个逻辑服务S1S2S3
  • 每个服务的两个实例都在运行,所以我们有以下进程:S1P1, S1P2, S2P1, S2P2, S3P1,S3P2
  • ZeroMQ在单个进程中运行并可由所有服务进程访问的代理

假设一个逻辑服务发布一条逻辑服务感兴趣S1的消息,并且. 每个逻辑服务只有一个进程必须接收,所以假设和。M1S2S3M1S2P1S3P2

我尝试了以下方法,但没有成功:

  • 代理线程 1 正在运行XSUB/XPUB代理
  • 代理线程 2 正在运行一个连接到套接字的ROUTER/DEALER代理并订阅了所有内容(对于逻辑)ROUTERXPUBS1
  • 代理线程 3 正在运行连接到套接字的ROUTER/DEALER代理并订阅了所有内容(对于逻辑)ROUTERXPUBS2
  • 代理线程 4 正在运行一个连接到 XPUB 套接字的ROUTER/DEALER代理并订阅了所有内容(对于逻辑)ROUTERS3
  • 每个逻辑服务进程都在运行一个REP连接到代理DEALER套接字的套接字线程

我认为XSUB/XPUB代理会给我发布/订阅语义,并且代理会在套接字之间为代理发送的消息ROUTER/DEALER引入竞争。REPXSUB/XPUB

我怎样才能结合ZeroMQ套接字来完成这个?

更新1

我知道“没有成功”没有帮助,我尝试了不同的配置并得到了不同的错误。我尝试的最新配置如下:

(XSUB proxy=> XPUB) => (SUB copyLoop=> REQ) => (ROUTER proxy=> DEALER) => REP

复制循环是这样的:

public void start() {
    context = ZMQ.context(1);

    subSocket = context.socket(ZMQ.SUB);
    subSocket.connect(subSocketUrl);
    subSocket.subscribe("".getBytes());

    reqSocket = context.socket(ZMQ.REQ);
    reqSocket.connect(reqSocketUrl);

    while (!Thread.currentThread().isInterrupted()) {
        final Message msg = receiveNextMessage();
        resendMessage(msg);
    }
}

private Message receiveNextMessage() {
    final String header = subSocket.recvStr();
    final String entity = subSocket.recvStr();

    return new Message(header, entity);
}

private void resendMessage(Message msg) {
    reqSocket.sendMore(msg.getKey());
    reqSocket.send(msg.getData(), 0);
}

我得到的例外如下:

java.lang.IllegalStateException: Cannot send another request
    at zmq.Req.xsend(Req.java:51) ~[jeromq-0.3.4.jar:na]
    at zmq.SocketBase.send(SocketBase.java:613) ~[jeromq-0.3.4.jar:na]
    at org.zeromq.ZMQ$Socket.send(ZMQ.java:1206) ~[jeromq-0.3.4.jar:na]
    at org.zeromq.ZMQ$Socket.sendMore(ZMQ.java:1189) ~[jeromq-0.3.4.jar:na]
    at com.xyz.messaging.zeromq.SubReqProxyConnector.resendMessage(SubReqProxyConnector.java:47) ~[classes/:na]
    at com.xyz.messaging.zeromq.SubReqProxyConnector.start(SubReqProxyConnector.java:35) ~[classes/:na]

我正在运行 JeroMQ 0.3.4、Oracle Java 8 JVM 和 Windows 7。

4

2 回答 2

3

您的连接似乎增加了一些复杂性ROUTER- 您应该能够直接连接到您的发布者的所有事情。

您当前遇到的错误是REQ套接字具有严格的消息排序模式-不允许send()连续两次,必须发送/接收/发送/接收/等(同样,REP套接字必须接收/发送/接收/发送/等)。从它的样子来看,您只是在您的REQ套接字上执行发送/发送/发送/等,而没有收到任何响应。如果您不关心同行的响应,那么您必须接收并丢弃它或使用DEALER(或ROUTER,但DEALER在您当前的图表中更有意义)。

我已经创建了一个图表,说明我将如何在下面完成此架构 - 使用您的基本流程结构。

Broker T1         Broker T2                Broker T3                Broker T4
(PUB*)------>(*SUB)[--](DEALER*)   -->(*SUB)[--](DEALER*)   -->(*SUB)[--](DEALER*)
       |_____________________||____|                  ||    |                  ||
       |_____________________||_______________________||____|                  ||
                             ||                       ||                       ||
     ========================||     ==================||            ===========||=
   ||             ||              ||              ||              ||              ||
   ||             ||              ||              ||              ||              ||
   ||             ||              ||              ||              ||              ||
(REP*)         (REP*)          (REP*)          (REP*)          (REP*)          (REP*)
 S1P1           S1P2            S2P1            S2P2            S3P1            S3P2

所以,主要的区别是我放弃了你的(SUB copyLoop=> REQ)步骤。是否选择XPUB/XSUBvsPUB/SUB取决于您,但除非您当前想使用XPUB/XSUB.

显然,此图不处理信息如何进入您的代理,您当前在其中显示一个XSUB套接字 - 这超出了您迄今为止提供的信息的范围,大概您已经能够成功地将信息接收到您的代理,所以我不会处理那个。

我假设您专用于每个服务的代理线程正在明智地选择是否将消息发送到他们的服务?如果是这样,那么您选择让他们订阅所有内容应该可以正常工作,否则可能需要更智能的订阅设置。

如果您在REP服务进程上使用套接字,则服务进程必须接收该消息并异步处理它,永远不要将有关该消息的任何详细信息传达给代理。然后它必须用确认(如“RECEIVED”)响应每条消息,以便它遵循REP套接字的严格接收/发送/接收/发送模式。

如果您想要关于服务如何处理发送回代理的消息的任何其他类型的通信,REP则不再是适合您的服务进程的套接字类型,并且DEALER可能不再是适合您的代理的正确套接字类型。如果您想要某种形式的负载平衡以便发送到下一个打开的服务进程,您需要使用ROUTER/REQ并让每个服务指示其可用性,并让代理保留消息,直到下一个服务进程说它可用发回结果。如果您想要某种其他类型的消息处理,则必须指出这是什么,以便可以提出合适的架构。

于 2016-03-11T15:29:11.677 回答
1

显然,我混淆了一些元素:

  • 无论您将其用作客户端套接字 ( Socket.connect) 还是服务器端套接字 ( Socket.bind) ,套接字都具有相同的 API
  • 无论类型如何,套接字都具有相同的 API(例如Socket.subscribe,不应在PUSH套接字上调用)
  • 某些套接字类型需要发送/接收响应循环(例如REQ/REP
  • 沟通模式中的一些细微差别(PUSH/PULLvs ROUTER/DEALER
  • 调试 ZeroMQ 设置的困难(不可能?)

非常感谢 Jason 非常详细的回答(和很棒的图表!),为我指明了正确的方向。

我最终得到了以下设计:

  • 代理线程 1 正在运行一个扇出XSUB/XPUB代理,bind(localhost:6000)并且bind(localhost:6001)
  • 代理线程 2 正在运行一个队列SUB/PUSH代理connect(localhost:6001)并且bind(localhost:6002); 代理线程 3 和 4 使用具有不同绑定端口号的类似设计
  • 消息生产者使用PUB套接字连接到代理connect(localhost:6000)
  • 消息消费者使用PULL套接字连接到代理队列代理connect(localhost:6002)

在这个特定于服务的队列机制之上,我能够添加一个类似的特定于服务的扇出机制,相当简单:

  • 代理线程在和上运行SUB/PUB代理connect(localhost:6001)bind(localhost:6003)
  • 消息生产者仍然使用PUB套接字连接到代理connect(localhost:6000)
  • SUB消息消费者使用套接字连接到代理扇出代理connect(localhost:6003)

这是一次有趣的旅程。

于 2016-03-14T15:30:44.697 回答