I'm rewriting a method in a library to utilize Rx. The below code example is the original method.
public void connect(ConnectionListener connectionListener) {
//......
RxBleDevice device = mRxBleClient.getBleDevice(builder.toString());
mEstablishedConnection = device.establishConnection(mContext, false)
.observeOn(AndroidSchedulers.mainThread())
.doOnError(throwable -> {
throwable.printStackTrace();
connectionListener.onFailed();
Log.d(TAG, "Error establishing a connection" + throwable.getMessage());
})
.subscribe(rxBleConnection -> {
mConnection = rxBleConnection;
connectionListener.onConnected();
setNotifications();
Log.d(TAG, "Connection established. Status: " + device.getConnectionState().toString());
}, throwable -> {
if (throwable != null) {
throwable.printStackTrace();
}
});
}
My first go was to return a Subscription
instead of saving it to mEstablishedConnection
. This would allow the user to unsubscribe to trigger a disconnect:
public Subscription connect() {
//..
RxBleDevice device = mRxBleClient.getBleDevice(builder.toString());
return device.establishConnection(mContext, false)
.observeOn(AndroidSchedulers.mainThread())
.doOnError(throwable -> {
throwable.printStackTrace();
Log.d(TAG, "Error establishing a connection" + throwable.getMessage());
})
.flatMap(rxBleConnection -> {
mConnection = rxBleConnection;
return Observable.empty();
})
.subscribe(o -> {
setNotifications();
Log.d(TAG, "Connection established. Status: " + device.getConnectionState().toString());
}, throwable -> {
if (throwable != null) {
throwable.printStackTrace();
}
});
}
The issue with the above is that I can not properly propagate errors back to the caller, which would be nice to do. How can I rewrite this method to make it reactive, letting the caller receive errors, and not just return an RxBleConnection
, which is a 3rd party class?