我将@Hbf 的答案标记为解决方案,因为我最终做了一个 Akka 轮询器,如如何在 Akka Future 中包装 java.util.concurrent.Future?. 作为参考,我还尝试过:
- 创建一个 HystrixCommandExcutionHook 并扩展 HystrixCommand 以允许回调。这不起作用,因为没有在正确的时间调用钩子。
- 通过装饰的执行器在 Hystrix 中创建期货,然后从命令中投射期货来使用 Guavas 可听的未来。不起作用,因为 Hystrix 使用了无法修饰的 ThreadPoolExecutor。
编辑:我在下面添加 Akka poller 代码,因为原始答案是在 Scala 中,如果 Java 未来不能很好地取消,它会挂起。下面的解决方案总是在超时后离开线程。
protected Future wrapJavaFutureInAkkaFuture(final java.util.concurrent.Future javaFuture, final Option maybeTimeout, final ActorSystem actorSystem) {
final Promise promise = Futures.promise();
if (maybeTimeout.isDefined()) {
pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, Option.option(maybeTimeout.get().fromNow()), actorSystem);
} else {
pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, Option. none(), actorSystem);
}
return promise.future();
}
protected void pollJavaFutureUntilDoneOrCancelled(final java.util.concurrent.Future javaFuture, final Promise promise, final Option maybeTimeout, final ActorSystem actorSystem) {
if (maybeTimeout.isDefined() && maybeTimeout.get().isOverdue()) {
// on timeouts, try to cancel the Java future and simply walk away
javaFuture.cancel(true);
promise.failure(new ExecutionException(new TimeoutException("Future timed out after " + maybeTimeout.get())));
} else if (javaFuture.isDone()) {
try {
promise.success(javaFuture.get());
} catch (final Exception e) {
promise.failure(e);
}
} else {
actorSystem.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), new Runnable() {
@Override
public void run() {
pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, maybeTimeout, actorSystem);
}
}, actorSystem.dispatcher());
}
}