1

I have a list of BLE devices, and am using RxJava to interact with them. I need to emit an item from the list, write a characteristic to it repeatedly until X happens, and then proceed to the next item in the list.

Current code:

Observable.from(mDevices)
                .flatMap(new Func1<Device, Observable<?>>() {
                    @Override
                    public Observable<?> call(Device device) {
                        Log.d(TAG, "connecting for policing");
                        return device.connectForPolicing();
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Object>() {
                    @Override
                    public void call(Object o) {
                        Log.d(TAG, "subscribing... ");
                    }
                });

where .connectForPolicing() looks like:

public Observable<byte[]> connectForPolice() {

        ....

        return device.establishConnection(mContext, false)
                .flatMap(new Func1<RxBleConnection, Observable<byte[]>>() {
                    @Override
                    public Observable<byte[]> call(RxBleConnection rxBleConnection) {
                        byte[] value = new byte[1];
                        value[0] = (byte) (3 & 0xFF);
                        //Buzz the device
                        return rxBleConnection.writeCharacteristic(Constants.BUZZER_SELECT, value);
                    }
                })
                .repeat(3)//ignore
                .takeUntil(device.observeConnectionStateChanges().filter(new Func1<RxBleConnection.RxBleConnectionState, Boolean>() {
                    @Override
                    public Boolean call(RxBleConnection.RxBleConnectionState rxBleConnectionState) {

                        return rxBleConnectionState == RxBleConnection.RxBleConnectionState.DISCONNECTING;
                    }
                }));
    }

This code seems to immediately emit all the items in the list, and therefore will connect and buzz all items at the same time. How can I emit items one at a time so that I may interact with them?

The pseudocode would be something like:

for(Device device : devices) {
    device.connect();
    while(device.isConnected()) {
        device.beep();
    }
}
4

2 回答 2

1

替换flatMapconcatMap

.concatMap(device -> device.connectForPolicing())

flatMap使用merge运算符。它立即发出所有项目。虽然concatMap使用concat, 顺序发出项目。关于它的好文章。

于 2016-11-18T00:00:01.930 回答
0

您可以使用.flatMap(Observable, int)运算符。

Observable.from(mDevices)
            .flatMap(
                new Func1<Device, Observable<?>>() {
                    @Override
                    public Observable<?> call(Device device) {
                        Log.d(TAG, "connecting for policing");
                        return device.connectForPolicing();
                    }
                },
                1
            )
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<Object>() {
                @Override
                public void call(Object o) {
                    Log.d(TAG, "subscribing... ");
                }
            });

int参数限制最大并发操作。在这种情况下,它将按顺序处理。

如果您想反复让设备发出蜂鸣声,直到它断开连接,则connectForPolice()还需要更改功能:

public Observable<byte[]> connectForPolice(RxBleDevice device) {

    ....

    return device.establishConnection(mContext, false)
            .flatMap(new Func1<RxBleConnection, Observable<byte[]>>() { // once the connection is established ...
                @Override
                public Observable<byte[]> call(RxBleConnection rxBleConnection) {
                    byte[] value = new byte[1];
                    value[0] = (byte) (3 & 0xFF);
                    //Buzz the device
                    return Observable // ... we return an observable ...
                            .defer(new Func0<Observable<byte[]>>() {
                                @Override
                                public Observable<byte[]> call() {
                                    return rxBleConnection.writeCharacteristic(Constants.BUZZER_SELECT, value); // ... (that on each subscription will emit a fresh write characteristic observable) ...
                                }
                            })
                            .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() { // ... which we will subscribe (the Observable.defer()) again ...
                                @Override
                                public Observable<?> call(Observable<? extends Void> observable) {
                                    return observable.delay(10, TimeUnit.SECONDS); // ... after 10 seconds from the previous complete
                                }
                            });
                }
            })
            .onErrorResumeNext(new Func1<Throwable, Observable<? extends byte[]>>() { // if the device will trigger disconnect then a BleDisconnectedException will be thrown ...
                @Override
                public Observable<? extends byte[]> call(Throwable throwable) {
                    return Observable.empty(); // ... in which situation we will just finish the Observable
                }
            });
}
于 2016-11-18T07:48:10.580 回答