这里有两个问题:一个与您尝试编写代码的方式有关的小问题,以及一个较大的问题,其中包含许多提供异步服务调用的库,但没有在像这样的异步框架中充分利用它们的好方法网状。这迫使用户使用像这样的次优黑客攻击,或者是一种不太糟糕但仍然不是理想的方法,我稍后会谈到。
首先是编码问题。问题是您试图从与您的处理程序关联的线程以外的线程调用 ChannelHandlerContext 方法,这是不允许的。这很容易修复,如下所示。您可以通过其他几种方式对其进行编码,但这可能是最简单的:
private static ExecutorService pool = Executors.newFixedThreadPool(20);
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
//...
final GetFuture<String> future = memcachedClient().getAsync("foo", stringTranscoder());
// first wait for the response on a pool thread
pool.execute(new Runnable() {
public void run() {
String value;
Exception err;
try {
value = future.get(3, TimeUnit.SECONDS); // or whatever timeout you want
err = null;
} catch (Exception e) {
err = e;
value = null;
}
// put results into final variables; compiler won't let us do it directly above
final fValue = value;
final fErr = err;
// now process the result on the ChannelHandler's thread
ctx.executor().execute(new Runnable() {
public void run() {
handleResult(fValue, fErr);
}
});
}
});
// note that we drop through to here right after calling pool.execute() and
// return, freeing up the handler thread while we wait on the pool thread.
}
private void handleResult(String value, Exception err) {
// handle it
}
这将起作用,并且可能足以满足您的应用程序。但是你有一个固定大小的线程池,所以如果你要处理超过 20 个并发连接,那将成为一个瓶颈。您可以增加池大小,或使用无界的池,但此时,您还不如在 Tomcat 下运行,因为内存消耗和上下文切换开销开始成为问题,并且您失去了吸引网易第一!
问题是,Spymemcached 是基于 NIO 的、事件驱动的,并且只使用一个线程来完成所有工作,但无法充分利用其事件驱动的特性。我希望他们很快就会解决这个问题,就像 Netty 4 和 Cassandra 最近通过在 Future 对象上提供回调(侦听器)方法一样。
同时,和你在同一条船上,我研究了替代方案,但对我的发现不太满意,我(昨天)写了一个 Future 跟踪器类,它可以以可配置的速率轮询多达数千个 Futures,然后给你打电话完成后返回您选择的线程(执行程序)。它只使用一个线程来执行此操作。如果您想尝试一下,我已经将它放在 GitHub 上,但请注意,正如他们所说,它仍然是湿的。我在过去的一天里对它进行了很多测试,即使有 10000 个并发模拟 Future 对象,每毫秒轮询一次,它的 CPU 利用率也可以忽略不计,尽管它开始超过 10000。使用它,上面的示例看起来像这样:
// in some globally-accessible class:
public static final ForeignFutureTracker FFT = new ForeignFutureTracker(1, TimeUnit.MILLISECONDS);
// in a handler class:
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
// ...
final GetFuture<String> future = memcachedClient().getAsync("foo", stringTranscoder());
// add a listener for the Future, with a timeout in 2 seconds, and pass
// the Executor for the current context so the callback will run
// on the same thread.
Global.FFT.addListener(future, 2, TimeUnit.SECONDS, ctx.executor(),
new ForeignFutureListener<String,GetFuture<String>>() {
public void operationSuccess(String value) {
// do something ...
ctx.fireChannelRead(someval);
}
public void operationTimeout(GetFuture<String> f) {
// do something ...
}
public void operationFailure(Exception e) {
// do something ...
}
});
}
您不希望任何时候有超过一个或两个 FFT 实例处于活动状态,否则它们可能会消耗 CPU。但是单个实例可以处理数千个未完成的 Futures;拥有第二个的唯一原因是以较慢的轮询速率(例如 10-20 毫秒)处理更高延迟的呼叫,例如 S3。
轮询方法的一个缺点是它增加了少量延迟。例如,每毫秒轮询一次,平均会增加 500 微秒的响应时间。这对于大多数应用程序来说都不是问题,而且我认为这不仅仅是通过线程池方法节省的内存和 CPU 来抵消。
我预计在一年左右的时间里这将不是问题,因为更多异步客户端提供回调机制,让您充分利用 NIO 和事件驱动模型。