发布者是否可以使用 ZeroMQ 发布到同一台机器上的多个客户端?我想要一组客户端,每个客户端都可以使用 SocketType 进行标准的请求/响应调用。REQ和 SocketType。REP,但它也可以使用 SocketType 接收通知。SUB和 SocketType。酒吧。
尽管我的版本只有一个发布者,但我已尝试实现此拓扑,取自此处。
这是我的出版商:
public class ZMQServerSmall
{
public static void main(String[] args)
{
try (ZContext context = new ZContext())
{
ZMQ.Socket rep = context.createSocket(SocketType.REP);
rep.bind("tcp://*:5555");
ZMQ.Socket pub = context.createSocket(SocketType.PUB);
pub.bind("tcp://*:7777");
while (!Thread.currentThread().isInterrupted())
{
String req = rep.recvStr(0);
rep.send(req + " response");
pub.sendMore("Message header");
pub.send("Message body");;
}
}
}
}
这是我的代理(我包括了一个侦听器来尝试查看发生了什么):
public class ZMQForwarderSmall
{
public static void main(String[] args)
{
try
(
ZContext context = new ZContext();
)
{
ZMQ.Socket frontend = context.createSocket(SocketType.XSUB);
frontend.connect("tcp://*:7777");
ZMQ.Socket backend = context.createSocket(SocketType.XPUB);
backend.bind("tcp://*:6666");
IAttachedRunnable runnable = new Listener();
Socket listener = ZThread.fork(context, runnable);
ZMQ.proxy(frontend, backend, listener);
}
catch (Exception e)
{
System.err.println(e.getMessage());
}
}
private static class Listener implements IAttachedRunnable
{
@Override
public void run(Object[] args, ZContext ctx, Socket pipe)
{
while (true)
{
ZFrame frame = ZFrame.recvFrame(pipe);
if (frame == null)
break; // Interrupted
System.out.println(frame.toString());
frame.destroy();
}
}
}
}
这是我的订阅者:
public class ZMQClientSmall
{
public static void main(String[] args) throws IOException
{
String input;
try
(
ZContext context = new ZContext();
BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))
)
{
ZMQ.Socket reqSocket = context.createSocket(SocketType.REQ);
reqSocket.connect("tcp://localhost:5555");
ZMQ.Socket subSocket = context.createSocket(SocketType.SUB);
subSocket.connect("tcp://localhost:6666");
subSocket.subscribe("".getBytes(ZMQ.CHARSET));
while ((input = stdIn.readLine()) != null)
{
reqSocket.send(input.getBytes(ZMQ.CHARSET), 0);
String response = reqSocket.recvStr(0);
String address = subSocket.recvStr(ZMQ.DONTWAIT);
String contents = subSocket.recvStr(ZMQ.DONTWAIT);
System.out.println("Notification received: " + address + " : " + contents);
}
}
}
}
这是测试。我打开四个终端;1 个发布者、1 个代理和 2 个客户端。当我在两个客户端中的任何一个中发出请求时,我希望在两者中都看到通知,但我只在发出请求的终端中看到通知。我知道两个客户端都使用相同的地址(localhost:6666),但我希望代理能解决这个问题。
任何人都可以在这里看到明显的错误吗?