4

我在 Java Web 应用程序中有一个分层架构。UI 层只是 Java,服务是类型化的 Akka actor,外部服务调用(WS、DB 等)被包装在 Hystrix 命令中。

UI 调用服务,服务返回 Akka 未来。这是一个 Akka 未来,因为我想通过 Akka 未来提供的 onComplete 和 onFailure 回调来简化 UI 编码。然后该服务创建执行一些映射等的未来,并包装对返回 Java 未来的 HystrixCommand 的调用。

所以在伪代码中:

用户界面

AkkaFuture future = service.getSomeData();

服务

public AkkaFuture getSomeData() {
    return future {
        JavaFuture future = new HystrixCommand(mapSomeData()).queue()
        //what to do here, currently just return future.get()
    }
}

问题是我想释放服务参与者正在使用的线程,而只是绑定 Hystrix 使用的线程。但是 java 未来会阻止这种情况,因为我必须阻止它的完成。我能想到的唯一选择(我不确定我是否喜欢)是不断轮询 Java 未来并在 Java 未来完成时完成 Akka 未来。

注意:这个问题与 Hystrix 本身并没有真正的关系,但如果有人想出一个与 Hystrix 相关的解决方案,我决定提一下。

4

3 回答 3

3

我将@Hbf 的答案标记为解决方案,因为我最终做了一个 Akka 轮询器,如如何在 Akka Future 中包装 java.util.concurrent.Future?. 作为参考,我还尝试过:

  • 创建一个 HystrixCommandExcutionHook 并扩展 HystrixCommand 以允许回调。这不起作用,因为没有在正确的时间调用钩子。
  • 通过装饰的执行器在 Hystrix 中创建期货,然后从命令中投射期货来使用 Guavas 可听的未来。不起作用,因为 Hystrix 使用了无法修饰的 ThreadPoolExecutor。

编辑:我在下面添加 Akka poller 代码,因为原始答案是在 Scala 中,如果 Java 未来不能很好地取消,它会挂起。下面的解决方案总是在超时后离开线程。


    protected  Future wrapJavaFutureInAkkaFuture(final java.util.concurrent.Future javaFuture, final Option maybeTimeout, final ActorSystem actorSystem) {
      final Promise promise = Futures.promise();
        if (maybeTimeout.isDefined()) {
          pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, Option.option(maybeTimeout.get().fromNow()), actorSystem);
        } else {
          pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, Option. none(), actorSystem);
        }

        return promise.future();
    }

    protected  void pollJavaFutureUntilDoneOrCancelled(final java.util.concurrent.Future javaFuture, final Promise promise, final Option maybeTimeout, final ActorSystem actorSystem) {
      if (maybeTimeout.isDefined() && maybeTimeout.get().isOverdue()) {
        // on timeouts, try to cancel the Java future and simply walk away
        javaFuture.cancel(true);
        promise.failure(new ExecutionException(new TimeoutException("Future timed out after " + maybeTimeout.get())));

      } else if (javaFuture.isDone()) {
        try {
          promise.success(javaFuture.get());
        } catch (final Exception e) {
          promise.failure(e);
        }
      } else {
            actorSystem.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), new Runnable() {
          @Override
          public void run() {
            pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, maybeTimeout, actorSystem);
          }
        }, actorSystem.dispatcher());
      }
    }
于 2013-03-13T18:57:44.680 回答
2

众所周知,Java 期货在设计上不如 Scala 期货。例如,看一下“How do I wrap a java.util.concurrent.Future in an Akka Future”的讨论。

但是:也许,Hystrix 提供了某种onComplete回调而不是轮询(如上面讨论中所建议的)?我根本不知道这个库,但偶然发现onComplete了 Hystrix API 中的一个。也许它有帮助?

于 2013-02-27T23:29:59.987 回答
2

从 Hystrix 1.3 开始,它现在还支持真正的非阻塞回调,这将更适合 Akka/Scala 非阻塞和可组合的未来行为:https ://github.com/Netflix/Hystrix/wiki/How-To -使用#wiki-Reactive-Execution

于 2013-10-01T21:36:57.480 回答