0

I'm rewriting a method in a library to utilize Rx. The below code example is the original method.

public void connect(ConnectionListener connectionListener) {

    //......

    RxBleDevice device = mRxBleClient.getBleDevice(builder.toString());

    mEstablishedConnection = device.establishConnection(mContext, false)
            .observeOn(AndroidSchedulers.mainThread())
            .doOnError(throwable -> {
                throwable.printStackTrace();
                connectionListener.onFailed();
                Log.d(TAG, "Error establishing a connection" + throwable.getMessage());
            })
            .subscribe(rxBleConnection -> {
                mConnection = rxBleConnection;
                connectionListener.onConnected();
                setNotifications();
                Log.d(TAG, "Connection established. Status: " + device.getConnectionState().toString());
            }, throwable -> {
                if (throwable != null) {
                    throwable.printStackTrace();
                }
            });
}

My first go was to return a Subscription instead of saving it to mEstablishedConnection. This would allow the user to unsubscribe to trigger a disconnect:

public Subscription connect() {

    //..

    RxBleDevice device = mRxBleClient.getBleDevice(builder.toString());

    return device.establishConnection(mContext, false)
            .observeOn(AndroidSchedulers.mainThread())
            .doOnError(throwable -> {
                throwable.printStackTrace();
                Log.d(TAG, "Error establishing a connection" + throwable.getMessage());
            })
            .flatMap(rxBleConnection -> {
                mConnection = rxBleConnection;
                return Observable.empty();
            })
            .subscribe(o -> {
                setNotifications();
                Log.d(TAG, "Connection established. Status: " + device.getConnectionState().toString());
            }, throwable -> {
                if (throwable != null) {
                    throwable.printStackTrace();
                }
            });
}

The issue with the above is that I can not properly propagate errors back to the caller, which would be nice to do. How can I rewrite this method to make it reactive, letting the caller receive errors, and not just return an RxBleConnection, which is a 3rd party class?

4

1 回答 1

1

正确的答案主要取决于您要实现的接口。您可以将内部包装RxBleConnection在您自己的界面中。它可能是这样的:

public Observable<YourWrapperClass> connect() {
    RxBleDevice device = // ...
    return device.establishConnection(context, false)
        .doOnNext(connection -> {
            setNotifications(connection) // pass the connection to setNotifications()
            mConnection = connection // store the mConnection if it is a must - in the reactive approach objects are meant to be passed instead of stored 
        })
        .map(connection -> new YourWrapperClass(connection))
}

在这里,调用者将负责订阅Observable(因此将能够取消订阅)并且不会知道内部(RxBleConnection)。

于 2016-12-02T21:29:47.063 回答