3

我一直在尝试将 NIO 与多线程读取处理配对,以制作可扩展的服务器。我不能使用任何框架,例如 Netty 或 MINA,因为有几个低级别的客户端-服务器协议细节在任何一个框架中都很难实现。我只是在检查我的代码,我意识到这个片段中有一个潜在的竞争条件:

//executes in selector thread
public void runSelector() {
    //...
    Set<SelectionKey> keys = selector.selectedKeys();
    for (Iterator<SelectionKey> keyIter = keys.iterator(); keyIter.hasNext(); ) {
        final SelectionKey key = keyIter.next();
        keyIter.remove();
        if (key.isValid() && key.isReadable()) { //point A
            //maybe some other short calculations
            ((SocketChannel) key.channel()).read(buffer); //point B
            workerThreadPool.submit(new Runnable() { public void run() { processRead(key, buffer); } });
        }
    }
    //...
}

//executes in a worker thread
private void processRead(SelectionKey key, ByteBuffer buf) {
    //... somewhere
    key.cancel();
    //...
}

这是一个极不可能发生的事件,但完全有可能在工作线程中,我调用 key.cancel() 而选择器线程位于我在 runSelector() 方法中注释的两点之间。请记住,这可以部署在高并发机器上,并且运行选择器线程的 CPU 内核可能会陷入困境。在选择器线程中的 key.isReadable() 和 channel.read() 之间的工作线程中调用 key.cancel() 并导致 CancelledKeyException 是否是一个有效的担忧?我是否应该有某种线程安全的集合来存储所有要取消的键,以便 runSelector() 可以在迭代结束时取消所有键?像 Netty 或 MINA 这样更专业的项目是如何处理这种情况的?

4

1 回答 1

2

锁定资源,

synchronized(key) {
    ((SocketChannel) key.channel()).read(buffer); //point B
}

如果这看起来很奇怪,请查看这个关于并发的 Oracle 教程。

于 2012-06-20T16:17:08.467 回答