我正在使用 RxJava 和 RxNetty 为 Apache Mesos 的新 HTTP 调度程序 API 编写客户端。
我已经成功地创建了与 RxNetty 的连接,并Observable<Event>从生成的分块流中创建了一个。
现在我正在尝试对一个接收器进行建模,该接收器可用于将调用发送回 Mesos,以声明/拒绝资源提供、确认任务状态更新等。
将要发送到 Mesos 的消息是 a Call,我需要能够为进入 Sink的每个消息提供一个onCompletedor 。这是由于 Mesos 对发送给它的数据执行同步验证。onErrorCallCall
我基本上是在尝试允许以下内容:
final MesosSchedulerClient client = new MesosSchedulerClient();
final Observable<Event> events = client.openEventStream(subscribeCall);
final Observable<Observable<Call>> ackCalls = events
.filter(event -> event.getType() == Event.Type.UPDATE && event.getUpdate().getStatus().hasUuid())
.zipWith(frameworkIDObservable, (Event e, AtomicReference<FrameworkID>> fwId) -> {
final TaskStatus status = e.getUpdate().getStatus();
final Call ackCall = ackUpdate(fwId.get(), status.getUuid(), status.getAgentId(), status.getTaskId());
return Observable.just(ackCall)
.doOnComplete(() -> { ... })
.doOnError((e) -> { ... });
});
client.sink(ackCalls);
现在我想出了一个自定义对象[1],它扩展了 Subject 并指定了Calland Action0foronCompleted和Action1<Throwable>for onError。不过,如果可能的话,我更愿意使用 RxJava 中的现有结构。我想出的示例用法[2]。
任何指导将不胜感激。