我有一个消耗传入消息的verticle。每条消息都是一个 vertx JsonObject
,其中包含一个 vertx JsonArray
。我想为这个数组中的每个元素执行逻辑。逻辑本身包含在一个单独的 Verticle 中。这第二个垂直使用rxVertx
. 它定义了几个消费者,每个消费者都委托给单独的方法,所有这些方法都返回一个Observable
.
我的问题是:如何:
- 遍历每个元素
JsonArray
- 将每个元素传递给与
Observables
.
在第一个verticle中,尝试了以下方法:
EventBus eb = rxVertx.eventBus();
JsonArray array= incomingMessage.getJsonArray(KEY);
List<Object> list = array.getList();
Observable<Object> observable = Observable.fromArray(list);
observable.flatMapSingle(s -> {
eb.rxSend(SECOND_VERTICLE_ADDRESS, s);
}).subscribe();
调用flatMapSingle
无法编译,因为:
The method flatMapSingle(Function<? super Object,? extends SingleSource<? extends R>>) in the type Observable<Object> is not applicable for the arguments ((<no type> s) -> {})
这样做的正确方法是什么?非常感谢