我有一个相当复杂的 Rx 代码链,用于执行许多操作。本质上,当启动时,Observable 开始发出项目列表。对于每个项目,都会建立一个连接(这些项目是 BLE 设备)并写入一个特性。
此特性每 10 秒重写一次,直到发生错误(例如拔出电池)。之后,原始列表中的下一个项目被连接到,依此类推。
这是大多数动作发生的地方:
public Observable<String> connectForPolice(String name, RemovalListener listener) {
StringBuilder builder = new StringBuilder(mAddress);
builder.setCharAt(mAddress.length() - 1, '4');
RxBleDevice device = mRxBleClient.getBleDevice(builder.toString());
return device.establishConnection(mContext, false)
.timeout(12, TimeUnit.SECONDS)
.doOnError(throwable -> {
Log.d(TAG, "Error thrown after timeout.");
throwable.printStackTrace();
})
.flatMap(new Func1<RxBleConnection, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(RxBleConnection rxBleConnection) {
byte[] value = new byte[1];
value[0] = (byte) (3 & 0xFF);
return Observable // ... we return an observable ...
.defer(() -> {
return rxBleConnection.writeCharacteristic(Constants.BUZZER_SELECT, value);
})
.repeatWhen(observable -> {
return observable.delay(10, TimeUnit.SECONDS);
});
}
})
.observeOn(AndroidSchedulers.mainThread())
.onErrorResumeNext(throwable -> {
throwable.printStackTrace();
listener.onFinished(name);
return Observable.empty();
});
}
这里有几个问题。可以看到,一个传统的监听器传入这个方法,这个监听器在onErrorResumeNext
. 此侦听器用于从 Activity 中的 RecyclerView 中删除项目,但这并不重要。问题是,以这种方式使用监听器有点像 Rx 的反模式。以下是其余相关代码,包括调用上述示例的代码:
@Override
public void onFinished(String deviceName) {
mAdapter.removeItem(deviceName);
}
private void startConnecting() {
mIsBeepingAll = true;
mConnectingSubscription = Observable.from(mAdapter.getItems())
.flatMap((Func1<Device, Observable<?>>) device -> {
Log.d(TAG, "connecting for policing");
return device.connectForPolice(device.getName(), PoliceActivity.this);
}, 1)
.doOnError(throwable -> throwable.printStackTrace())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted...");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
Log.d(TAG, "onError...");
}
@Override
public void onNext(Object o) {
Log.d(TAG, "onNext... ");
}
});
}
包括监听器的实现。问题是,我怎样才能执行与听众使用 Rx 所做的等效的操作?