考虑以下:
- 一组 3 个逻辑服务
S1
:S2
和S3
- 每个服务的两个实例都在运行,所以我们有以下进程:
S1P1
,S1P2
,S2P1
,S2P2
,S3P1
,S3P2
ZeroMQ
在单个进程中运行并可由所有服务进程访问的代理
假设一个逻辑服务发布一条逻辑服务感兴趣S1
的消息,并且. 每个逻辑服务只有一个进程必须接收,所以假设和。M1
S2
S3
M1
S2P1
S3P2
我尝试了以下方法,但没有成功:
- 代理线程 1 正在运行
XSUB/XPUB
代理 - 代理线程 2 正在运行一个连接到套接字的
ROUTER/DEALER
代理并订阅了所有内容(对于逻辑)ROUTER
XPUB
S1
- 代理线程 3 正在运行连接到套接字的
ROUTER/DEALER
代理并订阅了所有内容(对于逻辑)ROUTER
XPUB
S2
- 代理线程 4 正在运行一个连接到 XPUB 套接字的
ROUTER/DEALER
代理并订阅了所有内容(对于逻辑)ROUTER
S3
- 每个逻辑服务进程都在运行一个
REP
连接到代理DEALER
套接字的套接字线程
我认为XSUB/XPUB
代理会给我发布/订阅语义,并且代理会在套接字之间为代理发送的消息ROUTER/DEALER
引入竞争。REP
XSUB/XPUB
我怎样才能结合ZeroMQ
套接字来完成这个?
更新1
我知道“没有成功”没有帮助,我尝试了不同的配置并得到了不同的错误。我尝试的最新配置如下:
(XSUB proxy=> XPUB) => (SUB copyLoop=> REQ) => (ROUTER proxy=> DEALER) => REP
复制循环是这样的:
public void start() {
context = ZMQ.context(1);
subSocket = context.socket(ZMQ.SUB);
subSocket.connect(subSocketUrl);
subSocket.subscribe("".getBytes());
reqSocket = context.socket(ZMQ.REQ);
reqSocket.connect(reqSocketUrl);
while (!Thread.currentThread().isInterrupted()) {
final Message msg = receiveNextMessage();
resendMessage(msg);
}
}
private Message receiveNextMessage() {
final String header = subSocket.recvStr();
final String entity = subSocket.recvStr();
return new Message(header, entity);
}
private void resendMessage(Message msg) {
reqSocket.sendMore(msg.getKey());
reqSocket.send(msg.getData(), 0);
}
我得到的例外如下:
java.lang.IllegalStateException: Cannot send another request
at zmq.Req.xsend(Req.java:51) ~[jeromq-0.3.4.jar:na]
at zmq.SocketBase.send(SocketBase.java:613) ~[jeromq-0.3.4.jar:na]
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1206) ~[jeromq-0.3.4.jar:na]
at org.zeromq.ZMQ$Socket.sendMore(ZMQ.java:1189) ~[jeromq-0.3.4.jar:na]
at com.xyz.messaging.zeromq.SubReqProxyConnector.resendMessage(SubReqProxyConnector.java:47) ~[classes/:na]
at com.xyz.messaging.zeromq.SubReqProxyConnector.start(SubReqProxyConnector.java:35) ~[classes/:na]
我正在运行 JeroMQ 0.3.4、Oracle Java 8 JVM 和 Windows 7。