所以我使用 RxJava 的方式包裹在 Firebase RTD 或 Firestore 上,在这种情况下,Firestore 采用以下方式。假设您想从 Firestore 中检索数据(想法与 RTD 几乎相同,只需使用实时数据库即可)
/**
* Retrieves a document snapshot of a particular document within the Firestore database.
* @param collection
* @param document
* @return
*/
@Override
public Observable<DocumentSnapshot> retrieveByDocument$(String collection, String document) {
//Here I create the Observable and pass in an emitter as its lambda paramater.
//An emitter simply allows me to call onNext, if new data comes in, as well as catch errors or call oncomplete.
///It gives me control over the incoming data.
Observable<DocumentSnapshot> observable = Observable.create((ObservableEmitter<DocumentSnapshot> emitter) -> {
//Here I wrap the usual way of retrieving data from Firestore, by passing in the collection(i.e table name) and document(i.e item id) and
//I add a snapshot listener to listen to changes to that particular location.
mFirebaseFirestore.collection(collection).document(document)
.addSnapshotListener((DocumentSnapshot documentSnapshot, FirebaseFirestoreException e) -> {
if(e != null) {
//If there is an error thrown, I emit the error using the emitter
emitter.onError(e);
}else{
//If there is a value presented by the firestore stream, I use onNext to emit it to the observer, so when I subscribe I receive the incoming stream
emitter.onNext(documentSnapshot);
}
});
});
//I finally return the observable, so that I can subscribe to it later.
return observable;
}