3

我有一个使用 jeromq 0.3.2 用 Ja​​va 编写的多线程应用程序。我正在努力将它放入 Tanuki 服务包装器中,以便它作为 Windows 服务运行,并且我在干净地停止线程方面遇到了一些麻烦。代理的run()方法是简单海盗模式的变体,来自指南,如下所示:

public void run()
{
    ZContext context = new ZContext();
    Socket frontend = context.createSocket(ZMQ.ROUTER);
    Socket backend = context.createSocket(ZMQ.ROUTER);
    frontend.bind("tcp://*:5555");
    backend.bind("inproc://workers");

    while (!Thread.currentThread().isInterrupted())
    {
        ZMQ.Poller poller = new ZMQ.Poller(2);
        poller.register(frontend, ZMQ.Poller.POLLIN);
        poller.register(backend, ZMQ.Poller.POLLIN);
        poller.poll();
        if (poller.pollin(0))
        {
            processFrontend(frontend, backend, context);
        }
        if (poller.pollin(1))
        {
            processBackend(frontend, backend);
        }
    }

    // additonal code to close worker threads
}

当控制包装想要停止应用程序时,我怎样才能干净地退出这个循环?

如果当前没有连接客户端,则循环在调用时被阻塞poller.poll(),因此如果包装器调用interrupt()线程,则将其忽略。如果当前有客户端正在发送消息,则调用 会interrupt()导致该poller.poll()方法抛出一个zmq.ZError$IOException: java.nio.channels.ClosedByInterruptException

我也尝试过使用:

PollItem[] items = {
    new PollItem(frontend, Poller.POLLIN),
    new PollItem(backend, Poller.POLLIN)
};
int rc = ZMQ.poll(items, 2, -1);

if (rc == -1)
    break;

if (items[0].isReadable())
{
    processFrontend(frontend, backend, context);
}

if (items[1].isReadable())
{
    processBackend(frontend, backend);
}

但调用ZMQ.poll表现出相同的行为。两种可能的选择是:

  • 设置超时ZMQ.poll并将方法的内容包装在run()IOException 的 try/catch 中。
  • 向我的 Runnable 添加一个方法,该方法将连接到前端并发送一条特殊消息,该消息将被读入processFrontend并导致代码跳出循环。

第一个看起来有点脏,第二个感觉有点脆弱。还有其他我应该考虑的方法吗?我可以使用其他一些轮询方法来更清晰地对线程中断做出反应吗?

4

1 回答 1

3

最好将循环提取到单独的线程并打开一个额外的 inprocPULL套接字,以便循环从 3 个套接字轮询消息frontendbackendcontrol. 如果主线程收到中断信号,则应该向STOP代理循环发送命令。

正是在我的项目中实现了这种模式:https ://github.com/thriftzmq/thriftzmq-java/blob/master/thriftzmq/src/main/java/org/thriftzmq/ProxyLoop.java

在主线程上,您可以从其中添加一个shutdownHook并发送STOP命令到循环中。

 Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
        try {
            //Connect to controlSocket and send STOP command
            ZMQ.Socket peer = context.socket(ZMQ.PUSH);
            peer.connect(CONTROL_ENDPOINT);
            peer.send(STOP_MESSAGE, 0);
            peer.close();
        } catch (Exception ex) {
            logger.error("Failed to stop service", ex);
            System.exit(1);
        }
   }
});

PS 使用 guava-library 中的原语来管理服务生命周期使生活变得更加轻松。

于 2014-02-27T12:48:09.380 回答