0

我有一个消耗传入消息的verticle。每条消息都是一个 vertx JsonObject,其中包含一个 vertx JsonArray。我想为这个数组中的每个元素执行逻辑。逻辑本身包含在一个单独的 Verticle 中。这第二个垂直使用rxVertx. 它定义了几个消费者,每个消费者都委托给单独的方法,所有这些方法都返回一个Observable.

我的问题是:如何:

  1. 遍历每个元素JsonArray
  2. 将每个元素传递给与Observables.

在第一个verticle中,尝试了以下方法:

EventBus eb = rxVertx.eventBus();
JsonArray array= incomingMessage.getJsonArray(KEY);
List<Object> list  = array.getList();
Observable<Object> observable = Observable.fromArray(list);

observable.flatMapSingle(s -> {
      eb.rxSend(SECOND_VERTICLE_ADDRESS, s);
   }).subscribe();

调用flatMapSingle无法编译,因为:

The method flatMapSingle(Function<? super Object,? extends SingleSource<? extends R>>) in the type Observable<Object> is not applicable for the arguments ((<no type> s) -> {})

这样做的正确方法是什么?非常感谢

4

1 回答 1

1

如果flatMapSingle使用代码块定义函数参数,则必须使用return关键字:

observable.flatMapSingle(s -> {
  return eb.rxSend(SECOND_VERTICLE_ADDRESS, s);
}).subscribe(reply -> {
  // Handle each reply
});

请注意,flatMapSingle这并不保证回复与传入消息的顺序相同。如果您需要该保证,请使用concatMapSingle.

于 2019-06-05T13:38:19.933 回答