0

我有一些从硬件设备连续发出的数据。该数据首先需要连续发送到 web 服务 A,如果有足够的数据到达,该服务会在一段时间后返回结果流。然后,每个结果必须在到达 Web 服务 B 后立即转发到 Web 服务 B,如果来自 A 的足够结果到达,则该 Web 服务 B 会在一段时间后返回不同结果的流。每个 Web 服务都有一个异步回调风格的 API。在第一次将数据发送到每个 Web 服务之前,还需要进行一些连接设置。

如何将其映射到 RxJava?

4

1 回答 1

2

flatMap并且concatMap是异步链接的主要工具。

您需要将您的网络服务包装成Futures. 并将您的硬件设备转化为Observable源代码。然后就很简单了:

class WebServices {
    Future<Response1> callService1(parameters) { ... }
    Future<Response2> callService2(parameters) { ... }
}

hardwareSource
    .flatMap(v -> Observable.fromFuture(callService1(...)))
    .flatMap(r1 -> Observable.fromFuture(callService2(...)))
    .subscribe(r2 -> System.out.println(r2));

如果 web 服务接收和发送一系列消息,它们应该被包装到Observables. 处理管道看起来像:

class WebServices {
    Observable<Response1> sendToService1(parameters) { ... }
    Observable<Response2> sendToService2(parameters) { ... }
}

hardwareSource
    .flatMap(v -> sendToService1(...))
    .flatMap(r1 -> sendToService2(...))
    .subscribe(r2 -> System.out.println(r2));

如果 web 服务的传入和传出流不严格相关(响应不直接与请求相关联),那么我会将这些服务实现为暴露ObserverObservable接口的类。

// wire them up
hardwareSource.getObservable()
    .subscribe(webService1.getObserver());
webService1.getObservable()
    .subscribe(webService2.getObserver());
webService2.getObservable()
    .subscribe(resultHandler);

// initiate connections
webService2.connect()
webService1.connect()
hardwareSource.connect()
于 2017-01-18T13:39:10.907 回答