我有一些从硬件设备连续发出的数据。该数据首先需要连续发送到 web 服务 A,如果有足够的数据到达,该服务会在一段时间后返回结果流。然后,每个结果必须在到达 Web 服务 B 后立即转发到 Web 服务 B,如果来自 A 的足够结果到达,则该 Web 服务 B 会在一段时间后返回不同结果的流。每个 Web 服务都有一个异步回调风格的 API。在第一次将数据发送到每个 Web 服务之前,还需要进行一些连接设置。
如何将其映射到 RxJava?
flatMap
并且concatMap
是异步链接的主要工具。
您需要将您的网络服务包装成Future
s. 并将您的硬件设备转化为Observable
源代码。然后就很简单了:
class WebServices {
Future<Response1> callService1(parameters) { ... }
Future<Response2> callService2(parameters) { ... }
}
hardwareSource
.flatMap(v -> Observable.fromFuture(callService1(...)))
.flatMap(r1 -> Observable.fromFuture(callService2(...)))
.subscribe(r2 -> System.out.println(r2));
如果 web 服务接收和发送一系列消息,它们应该被包装到Observable
s. 处理管道看起来像:
class WebServices {
Observable<Response1> sendToService1(parameters) { ... }
Observable<Response2> sendToService2(parameters) { ... }
}
hardwareSource
.flatMap(v -> sendToService1(...))
.flatMap(r1 -> sendToService2(...))
.subscribe(r2 -> System.out.println(r2));
如果 web 服务的传入和传出流不严格相关(响应不直接与请求相关联),那么我会将这些服务实现为暴露Observer
和Observable
接口的类。
// wire them up
hardwareSource.getObservable()
.subscribe(webService1.getObserver());
webService1.getObservable()
.subscribe(webService2.getObserver());
webService2.getObservable()
.subscribe(resultHandler);
// initiate connections
webService2.connect()
webService1.connect()
hardwareSource.connect()