0

我有一个相当复杂的 Rx 代码链,用于执行许多操作。本质上,当启动时,Observable 开始发出项目列表。对于每个项目,都会建立一个连接(这些项目是 BLE 设备)并写入一个特性。

此特性每 10 秒重写一次,直到发生错误(例如拔出电池)。之后,原始列表中的下一个项目被连接到,依此类推。

这是大多数动作发生的地方:

public Observable<String> connectForPolice(String name, RemovalListener listener) {

        StringBuilder builder = new StringBuilder(mAddress);
        builder.setCharAt(mAddress.length() - 1, '4');
        RxBleDevice device = mRxBleClient.getBleDevice(builder.toString());

        return device.establishConnection(mContext, false)
                .timeout(12, TimeUnit.SECONDS)
                .doOnError(throwable -> {
                    Log.d(TAG, "Error thrown after timeout.");
                    throwable.printStackTrace();
                })
                .flatMap(new Func1<RxBleConnection, Observable<byte[]>>() {
                    @Override
                    public Observable<byte[]> call(RxBleConnection rxBleConnection) {
                        byte[] value = new byte[1];
                        value[0] = (byte) (3 & 0xFF);

                        return Observable // ... we return an observable ...
                                .defer(() -> {
                                    return rxBleConnection.writeCharacteristic(Constants.BUZZER_SELECT, value);
                                })
                                .repeatWhen(observable -> {
                                    return observable.delay(10, TimeUnit.SECONDS);
                                });
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .onErrorResumeNext(throwable -> {
                        throwable.printStackTrace();
                        listener.onFinished(name);
                    return Observable.empty();
                });
    }

这里有几个问题。可以看到,一个传统的监听器传入这个方法,这个监听器在onErrorResumeNext. 此侦听器用于从 Activity 中的 RecyclerView 中删除项目,但这并不重要。问题是,以这种方式使用监听器有点像 Rx 的反模式。以下是其余相关代码,包括调用上述示例的代码:

    @Override
    public void onFinished(String deviceName) {
        mAdapter.removeItem(deviceName);
    }

    private void startConnecting() {
        mIsBeepingAll = true;
        mConnectingSubscription = Observable.from(mAdapter.getItems())
                .flatMap((Func1<Device, Observable<?>>) device -> {
                    Log.d(TAG, "connecting for policing");
                    return device.connectForPolice(device.getName(), PoliceActivity.this);
                }, 1)
                .doOnError(throwable -> throwable.printStackTrace())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Object>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                        Log.d(TAG, "onError...");
                    }

                    @Override
                    public void onNext(Object o) {
                        Log.d(TAG, "onNext... ");
                    }
                });
    }

包括监听器的实现。问题是,我怎样才能执行与听众使用 Rx 所做的等效的操作?

4

1 回答 1

1

最简单的方法是改变它:

            .onErrorResumeNext(throwable -> {
                throwable.printStackTrace();
                listener.onFinished(name);
                return Observable.empty();
            });

进入这个:

            .onErrorResumeNext(throwable -> {
                throwable.printStackTrace();
                return Observable.just(name);
            });

调用者将使用:

private void startConnecting() {
    mIsBeepingAll = true;
    mConnectingSubscription = Observable.from(mAdapter.getItems())
            .flatMap((Func1<Device, Observable<?>>) device -> {
                Log.d(TAG, "connecting for policing");
                return device.connectForPolice(device.getName(), PoliceActivity.this);
            }, 1)
            .doOnError(throwable -> throwable.printStackTrace())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Object>() {
                @Override
                public void onCompleted() {
                    Log.d(TAG, "onCompleted...");
                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                    Log.d(TAG, "onError...");
                }

                @Override
                public void onNext(String deviceName) {
                    Log.d(TAG, "onNext... ");
                    mAdapter.removeItem(deviceName);
                }
            });
}
于 2016-12-14T20:38:31.940 回答