12

我有一个Thread带有私有Selector和公共register(SelectableChannel channel, ...)方法的子类,它允许其他线程将通道注册到选择器。

正如这里回答的那样,通道register()在选择器select()/期间阻塞,select(long timeout)所以我们需要wakeup()选择器。

我的线程无限期地选择(除非它被中断)并且它实际上设法在register()调用通道之前进入下一个选择。所以我想我用一个带synchronized块的简单锁来确保register()首先发生。

代码:(为了便于阅读,删除了不相关的代码)

public class SelectorThread extends Thread {
  ...

  public void register(SelectableChannel channel, Attachment attachment) throws IOException {
    channel.configureBlocking(false);
    synchronized (this) { // LOCKING OCCURS HERE
      selector.wakeup();
      channel.register(selector,
                       SelectionKey.OP_READ,
                       attachment);
    }
  }

  @Override
  public void run() {
    int ready;
    Set<SelectionKey> readyKeys;
    while (!isInterrupted()) {
      synchronized (this) {} // LOCKING OCCURS HERE

      try {
        ready = selector.select(5000);
      } catch (IOException e) {
        e.printStackTrace();
        continue;
      }

      if (ready == 0) {
        continue;
      }

      readyKeys = selector.selectedKeys();

      for (SelectionKey key : readyKeys) {
        readyKeys.remove(key);

        if (!key.isValid()) {
          continue;
        }

        if (key.isReadable()) {
          ...
        }
      }
    }
  }
}

This simple lock allows register() to happen before the thread continues with the next select loop. As far as I tested, this works as supposed.

Questions: Is that a "good" way to do it or are there any serious downsides to that? Would it be better to use a List or Queue (as suggested here) to store channels for registration, or a more sophisticated lock like this instead? What would the Pros/Cons of that be? Or are there any "even better" ways?

4

4 回答 4

4

Just treat Selector etc as not thread safe, perform all select related actions on the same thread, as Darron suggested.

The concurrency model of NIO selector is bullshit. I must call it out, because it's a huge waste of time for everyone who try to study it. In the end, the conclusion is, forget it, it's not for concurrent use.

于 2012-10-10T16:48:03.883 回答
4

All you need is a wakeup() before the register(), and, in the select loop, a short sleep before continuing if 'ready' is zero, to give register() a chance to run. No additional synchronisation: it's bad enough already; don't make it any worse. I'm not a fan of these queues of things to register, cancel, change interest ops, etc: they just sequentialize things that can really be done in parallel.

于 2012-10-11T03:11:51.787 回答
3

I am actually surprised the lock acquisition with the empty block is not removed at compile time. Pretty cool that it works. I mean it works, it's preemptive, it's not the prettiest approach, but it works. It's better than a sleep as it is predictable and since you use the wakeup call you know progress will be made as needed rather than on a periodic update if you purely relied on the select timeout.

The main downside of this approach is that you are saying that calls to register trump anything else, even servicing requests. Which may be true in your system, but usually this is not the case, I would say this is a possible issue. A minor issue which is more forward thinking is that you lock on the SelectorThread itself which is sort of a larger object in this case. Not bad, not great though as you expand, this lock will just have to documented and taken into account whenever other clients use this class. Personally I would go with making another lock altogether to avoid any unforeseen future hazards.

Personally, I like the queuing techniques. They assign roles to your threads, like a master and workers this way. Whereas all types of control happen on the master, like after every select check for more registrations from a queue, clear and farm out any read tasks, handler any changes in the overall connection setup (disconnects etc)... The "bs" concurrency model seems to accept this model pretty well and it's a pretty standard model. I don't think it's a bad thing as it makes the code a bit less hacky, more testable, and easier to read imo. Just takes a little more time to write out.

While I will admit, it has been a long time since I last wrote this stuff, there are other libraries out there that sort of take care of the queueing for you.

Grizzly Nio Framework while a little old, last time I used it, the main runloop was not bad. It setup a lot of the queuing for you.

Apache Mina Similar in that it provides a queuing framework.

But I mean in the end it depends on what you are working on.

  • is it a one man project just to play around with the framework?
  • is it a piece of production code that you want to live on for years?
  • is it a piece of production code that you are iterating on?

Unless you are planning on using this as a core piece of a service you are providing to customers, I would say your approach is fine. It may just have maintenance issues in the long run.

于 2012-10-11T05:12:10.330 回答
0

A possible way is injecting the channel registration (or other external task that need to be done within the NIO loop) to the selection loop, demo as bellow.

//private final Set<ExternalEvent> externalTaskEvents = ConcurrentHashMap.newKeySet();
//...

while (!Thread.currentThread().isInterrupted()) {
    try {
        selector.select();
    } catch (IOException ex) {
        ex.printStackTrace(Log.logWriter);
        return;
    }

    //handle external task events
    Iterator<ExternalEvent> eitr = externalTaskEvents.iterator();
    while (eitr.hasNext()) {
        ExternalEvent event = eitr.next();
        eitr.remove();
        if(event.task != null){
            event.task.accept(event);
        }
    }

    //handle NIO network events
    Iterator<SelectionKey> nitr = selector.selectedKeys().iterator();
    while (nitr.hasNext()) {
        SelectionKey key = nitr.next();
        nitr.remove();
        if (!key.isValid()) {
            continue;
        }
        try {
            if (key.isAcceptable()) {
                onAcceptable(key);
            } else if (key.isConnectable()) {
                onConnectable(key);
            } else {
                if (key.isReadable()) {
                    onReadable(key);
                }
                if (key.isWritable()) {
                    onWritable(key);
                }
            }
        } catch (IOException | InterruptedException | CancelledKeyException ex) {
            ex.printStackTrace(Log.logWriter);
            //...
        }
    }
}
于 2017-01-07T16:37:54.550 回答