1

我正在尝试使用 java 风格的 zmq 来测试在我的项目中使用 PGM over TCP 的好处。因此,我将 zmq 指南中的天气示例更改为使用 epgm 传输。一切都编译并运行,但没有发送或接收任何内容。如果我将传输改回 TCP,服务器会接收到客户端发送的消息,并且我会得到我期望的控制台输出。

那么,使用 PGM 有哪些要求呢?我更改了传递给绑定和连接方法的字符串,以遵循 zmq_pgm 的 zmq api:“transport://interface;multicast address:port”。那没有用。每当我尝试使用这种格式时,都会出现无效参数错误。因此,我通过删除“有效”的界面和分号来简化它,但我没有得到任何结果。

我找不到使用 pgm/epgm 的 jzmq 示例,并且 java 绑定的 api 文档没有为传递给绑定或连接的端点定义适当的字符串格式。那么我在这里错过了什么?我必须为客户端和服务器使用不同的主机吗?

需要注意的一点是,我在 VirtualBox VM(Ubuntu 14.04/OSX Mavericks 主机)上运行我的代码。我不确定这是否与我目前面临的问题有关。

服务器:

public class wuserver {

public static void main (String[] args) throws Exception {
    //  Prepare our context and publisher
    ZMQ.Context context = ZMQ.context(1);

    ZMQ.Socket publisher = context.socket(ZMQ.PUB);
    publisher.bind("epgm://xx.x.x.xx:5556");
    publisher.bind("ipc://weather");

    //  Initialize random number generator
    Random srandom = new Random(System.currentTimeMillis());
    while (!Thread.currentThread ().isInterrupted ()) {
        //  Get values that will fool the boss
        int zipcode, temperature, relhumidity;
        zipcode = 10000 + srandom.nextInt(10000) ;
        temperature = srandom.nextInt(215) - 80 + 1;
        relhumidity = srandom.nextInt(50) + 10 + 1;

        //  Send message to all subscribers
        String update = String.format("%05d %d %d", zipcode, temperature, relhumidity);
        publisher.send(update, 0);
    }

    publisher.close ();
    context.term ();
   }
}

客户:

public class wuclient {

public static void main (String[] args) {
    ZMQ.Context context = ZMQ.context(1);

    //  Socket to talk to server
    System.out.println("Collecting updates from weather server");
    ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
    //subscriber.connect("tcp://localhost:5556");
    subscriber.connect("epgm://xx.x.x.xx:5556");

    //  Subscribe to zipcode, default is NYC, 10001
    String filter = (args.length > 0) ? args[0] : "10001 ";
    subscriber.subscribe(filter.getBytes());

    //  Process 100 updates
    int update_nbr;
    long total_temp = 0;
    for (update_nbr = 0; update_nbr < 100; update_nbr++) {
        //  Use trim to remove the tailing '0' character
        String string = subscriber.recvStr(0).trim();

        StringTokenizer sscanf = new StringTokenizer(string, " ");
        int zipcode = Integer.valueOf(sscanf.nextToken());
        int temperature = Integer.valueOf(sscanf.nextToken());
        int relhumidity = Integer.valueOf(sscanf.nextToken());

        total_temp += temperature;

    }
    System.out.println("Average temperature for zipcode '"
            + filter + "' was " + (int) (total_temp / update_nbr));

    subscriber.close();
    context.term();
  }
}
4

2 回答 2

0

需要注意的另一点是,一些虚拟化环境——启用了 mac_filter 选项的 RHEV/Ovirt 和 libvirt/KVM 浮现在脑海中——默认情况下,通过 (eb|ip)tables 中性化一个人的能力以在来宾之间使用 mcast . 当然,使用 libvirt,解决方案是简单地将选项设置为 '0' 并重新启动 libvirtd。RHEV/Ovirt 需要一个自定义插件。

无论如何,我建议在您正在使用的每个系统上的网络设备上放置一个嗅探器,并观察以确保退出一台主机的流量在另一台主机上实际上是可见的。

于 2014-06-09T19:00:16.393 回答
0

有几种可能性:

  • 您需要确保使用以下--with-pgm选项编译 ZMQ:请参阅此处- 但如果您没有看到“协议不支持”,这似乎不是您的问题
  • 使用 rawpgm需要 root 权限,因为它需要创建原始套接字的能力......但epgm不需要,所以这也不应该是你的问题(我只是因为你使用术语“ pgm /epgm”而提出它,并且您应该知道它们并非在所有情况下都同样可用)
  • 在您的情况下,实际上出现的问题是 pgm/epgm 需要沿网络路径的支持。从理论上讲,它需要对您的路由器提供支持,因此您的应用程序可以发送一条消息并让您的路由器向每个客户端发送多条消息,但是如果您的服务器足够了解,它可能会立即发送多条消息并绕过这个路由器支持。问题是,正如您正确猜到的那样,不支持尝试在一台主机上完成所有操作。

因此,客户端和服务器需要不同的主机。

于 2014-05-27T16:13:36.000 回答