我有一个使用 jeromq 0.3.2 用 Java 编写的多线程应用程序。我正在努力将它放入 Tanuki 服务包装器中,以便它作为 Windows 服务运行,并且我在干净地停止线程方面遇到了一些麻烦。代理的run()
方法是简单海盗模式的变体,来自指南,如下所示:
public void run()
{
ZContext context = new ZContext();
Socket frontend = context.createSocket(ZMQ.ROUTER);
Socket backend = context.createSocket(ZMQ.ROUTER);
frontend.bind("tcp://*:5555");
backend.bind("inproc://workers");
while (!Thread.currentThread().isInterrupted())
{
ZMQ.Poller poller = new ZMQ.Poller(2);
poller.register(frontend, ZMQ.Poller.POLLIN);
poller.register(backend, ZMQ.Poller.POLLIN);
poller.poll();
if (poller.pollin(0))
{
processFrontend(frontend, backend, context);
}
if (poller.pollin(1))
{
processBackend(frontend, backend);
}
}
// additonal code to close worker threads
}
当控制包装想要停止应用程序时,我怎样才能干净地退出这个循环?
如果当前没有连接客户端,则循环在调用时被阻塞poller.poll()
,因此如果包装器调用interrupt()
线程,则将其忽略。如果当前有客户端正在发送消息,则调用 会interrupt()
导致该poller.poll()
方法抛出一个zmq.ZError$IOException: java.nio.channels.ClosedByInterruptException
我也尝试过使用:
PollItem[] items = {
new PollItem(frontend, Poller.POLLIN),
new PollItem(backend, Poller.POLLIN)
};
int rc = ZMQ.poll(items, 2, -1);
if (rc == -1)
break;
if (items[0].isReadable())
{
processFrontend(frontend, backend, context);
}
if (items[1].isReadable())
{
processBackend(frontend, backend);
}
但调用ZMQ.poll
表现出相同的行为。两种可能的选择是:
- 设置超时
ZMQ.poll
并将方法的内容包装在run()
IOException 的 try/catch 中。 - 向我的 Runnable 添加一个方法,该方法将连接到前端并发送一条特殊消息,该消息将被读入
processFrontend
并导致代码跳出循环。
第一个看起来有点脏,第二个感觉有点脆弱。还有其他我应该考虑的方法吗?我可以使用其他一些轮询方法来更清晰地对线程中断做出反应吗?