8

我想知道如何正确关闭 JeroMQ,到目前为止,我知道三种方法各有利弊,但我不知道哪一种是最好的。

情况:

  • 线程 A:拥有上下文,应提供启动/停止方法
  • 线程 B:实际监听线程

我目前的方法:

线程 A

static ZContext CONTEXT = new ZContext();
Thread thread;

public void start() {
    thread = new Thread(new B()).start();
}

public void stop() {
    thread.stopping = true;
    thread.join();
}

线程 B

boolean stopping = false;
ZMQ.Socket socket;

public void run() {
    socket = CONTEXT.createSocket(ROUTER);
    ... // socket setup
    socket.setReceiveTimeout(10);

    while (!stopping) {
        socket.recv();
    }

    if (NUM_SOCKETS >= 1) {
        CONTEXT.destroySocket(socket);
    } else {
        CONTEXT.destroy();
    }
}

这很好用。10ms 关机对我来说没问题,但是当没有收到消息时,我会不必要地增加 CPU 负载。目前我更喜欢这个。


第二种方法在两个线程之间共享套接字:

线程 A

static ZContext CONTEXT = new ZContext();
ZMQ.Socket socket;
Thread thread;

public void start() {
    socket = CONTEXT.createSocket(ROUTER);
    ... // socket setup
    thread = new Thread(new B(socket)).start();
}

public void stop() {
    thread.stopping = true;
    CONTEXT.destroySocket(socket);
}

线程 B

boolean stopping = false;
ZMQ.Socket socket;

public void run() {
    try {
        while (!stopping) {
            socket.recv();
        }
    } catch (ClosedSelection) {
        // socket closed by A
        socket = null;
    }
    if (socket != null) {
        // close socket myself
        if (NUM_SOCKETS >= 1) {
            CONTEXT.destroySocket(socket);
        } else {
            CONTEXT.destroy();
        }
    }
}

也像魅力一样工作,但即使recv已经阻止异常有时也不会被抛出。如果我在启动线程 A 后等待一毫秒,则总是会抛出异常。我不知道这是错误还是我滥用的影响,因为我共享套接字。


“revite”之前问过这个问题(https://github.com/zeromq/jeromq/issues/116)并得到了答案,这是第三个解决方案: https ://github.com/zeromq/jeromq/blob/master/ src/test/java/guide/interrupt.java

摘要:他们调用ctx.term()并中断socket.recv().

这工作正常,但我不想终止我的整个上下文,而只是这个单一的套接字。我必须为每个套接字使用一个上下文,所以我无法使用 inproc。

概括

目前,除了使用超时、共享套接字或终止整个上下文之外,我不知道如何让线程 B 脱离阻塞状态。

这样做的正确方法是什么?

4

3 回答 3

5

经常提到你可以只销毁 zmq 上下文,任何共享该上下文的东西都会退出,但这会造成一场噩梦,因为你退出的代码必须尽力避免意外调用死套接字对象的雷区。

尝试关闭套接字也不起作用,因为它们不是线程安全的,你最终会崩溃。

回答:最好的方法是按照 ZeroMQ 指南建议的方式通过多线程进行任何使用;使用 zmq 套接字而不是线程互斥锁/锁/等。设置一个额外的侦听器套接字,您将在关闭时连接并发送一些东西,并且您的 run() 应该使用 JeroMQ 轮询器来检查两个套接字中的哪一个接收到任何东西 - 如果额外的套接字接收到一些东西然后退出。

于 2014-12-01T23:25:32.560 回答
3

老问题,但以防万一......

我建议查看ZThread 源代码。您应该能够创建一个可以传递给fork方法的IAttachedRunnable实例,并且您的实例的 run 方法将传递一个 PAIR 套接字并在另一个线程中执行,而fork将返回连接的 PAIR 套接字以用于与您的IAttachedRunnable获得的 PAIR 套接字通信。

于 2014-07-19T12:12:52.063 回答
2

在此处查看 jeromq 源代码,即使您正在执行“阻塞”recv,您仍然会一直在消耗 CPU(线程从不休眠)。如果您对此感到担心,请在轮询之间让第二个线程休眠并让父线程中断。像(只是相关部分):

线程 A

public void stop() {
    thread.interrupt();
    thread.join();
}

线程 B

while (!Thread.interrupted()) {
  socket.recv(); // do whatever
  try {
    Thread.sleep(10); //milliseconds
  } catch (InterruptedException e) {
    break;
  }
}

此外,关于您的第二个解决方案,通常您不应该在线程之间共享套接字 - zeromq 指南对此非常清楚 - “不要在线程之间共享 ØMQ 套接字。ØMQ 套接字不是线程安全的。” 请记住,ZMQ 的一个主要用途是 IPC——线程通过连接的套接字进行通信,而不是共享一个套接字的同一端。不需要共享布尔停止变量之类的东西。

于 2014-04-30T01:41:51.407 回答