0

我正在使用 RxJava 和 RxNetty 为 Apache Mesos 的新 HTTP 调度程序 API 编写客户端。

我已经成功地创建了与 RxNetty 的连接,并Observable<Event>从生成的分块流中创建了一个。

现在我正在尝试对一个接收器进行建模,该接收器可用于将调用发送回 Mesos,以声明/拒绝资源提供、确认任务状态更新等。

将要发送到 Mesos 的消息是 a Call,我需要能够为进入 Sink的每个消息提供一个onCompletedor 。这是由于 Mesos 对发送给它的数据执行同步验证。onErrorCallCall

我基本上是在尝试允许以下内容:

final MesosSchedulerClient client = new MesosSchedulerClient();
final Observable<Event> events = client.openEventStream(subscribeCall);

final Observable<Observable<Call>> ackCalls = events
    .filter(event -> event.getType() == Event.Type.UPDATE && event.getUpdate().getStatus().hasUuid())
    .zipWith(frameworkIDObservable, (Event e, AtomicReference<FrameworkID>> fwId) -> {
        final TaskStatus status = e.getUpdate().getStatus();
        final Call ackCall = ackUpdate(fwId.get(), status.getUuid(), status.getAgentId(), status.getTaskId());
        return Observable.just(ackCall)
            .doOnComplete(() -> { ... })
            .doOnError((e) -> { ... });
    });

client.sink(ackCalls);

现在我想出了一个自定义对象[1],它扩展了 Subject 并指定了Calland Action0foronCompletedAction1<Throwable>for onError。不过,如果可能的话,我更愿意使用 RxJava 中的现有结构。我想出的示例用法[2]。

任何指导将不胜感激。

[1] https://github.com/BenWhitehead/mesos-rxjava/blob/sink-operation/mesos-rxjava-core/src/main/java/org/apache/mesos/rx/java/SinkOperation.java#L17

[2] https://github.com/BenWhitehead/mesos-rxjava/blob/sink-operation/mesos-rxjava-example/mesos-rxjava-example-framework/src/main/java/org/apache/mesos/rx /java/example/framework/sleepy/Main.java#L117-L124

4

1 回答 1

0

我最终得到的解决方案是创建一个自定义订阅者,它将处理事件流并将请求发送回 mesos。

于 2015-11-12T19:59:03.353 回答