15

在 vert.x 中,我可以向另一个 Verticle 发送消息并“异步等待”回复。

问题是:我想向多个 Verticle 发送消息,并在所有 Verticle 回复时调用一个异步处理程序。

这是可能的还是有更好的设计来实现这个功能?

编辑:

假设我有一个顶点 A,它向顶点 B、C 和 D 发送消息。每个顶点(B、C、D)对消息执行一些操作并返回 A 一些数据。verticle A 然后接收来自 B、C、D 的响应并对所有数据进行处理。问题是我发送的每条消息都有一个处理程序(一个用于 A,一个用于 B,一个用于 C),我希望在所有回复到达时调用一个处理程序。

4

2 回答 2

17

从 Vert.x 3.2 开始,文档解释了如何使用Future和进行异步协调CompositeFuture

因此,假设您想send通过事件总线进行两次调用,并在两者都成功时执行某些操作:

Future<Message> f1 = Future.future();
eventBus.send("first.address", "first message", f1.completer());

Future<Message> f2 = Future.future();
eventBus.send("second.address", "second message", f2.completer());

CompositeFuture.all(f1, f2).setHandler(result -> {
  // business as usual
});

最多可以将 6 个期货作为参数传递,或者它们可以作为列表传递。

于 2016-06-20T16:43:10.107 回答
3

最好的方法是使用响应式扩展,由Netflix 的 Rx.Java实现并由RxVertx 模块提供。

种类繁多的运算符使您可以执行诸如将多个异步调用的结果“压缩”到新结果中之类的操作,并且可以随心所欲地使用它。

在 GitHub 上有一个简单的演示,其中包含:

final Observable<JsonObject> meters = observeMetricsSource(metricsAddress, METERS_BUS_REQUEST, "meters", rx);
final Observable<JsonObject> histograms = observeMetricsSource(metricsAddress, HISTOGRAMS_BUS_REQUEST, "histograms", rx);
subscribeAndRespondJson(zip(meters, histograms, (jo1, jo2) -> jo1.mergeIn(jo2)), req);

这个片段展示了来自两个事件总线异步交互的两个 observables 如何被“压缩”(即合并)到一个最终的 HTTP 响应中。

于 2014-08-23T22:32:06.053 回答