我正在研究使用 RxJava2 进行反应式编程,并且我对它与 MongoDB 等异步数据库驱动程序的使用有疑问。
如果我使用阻塞 MongoDB 驱动程序来获取集合,则方法是这样的:
public class MyDao{
...
public Document getFirstDocument(String collectionName){
MongoCollection<Document> collection = database.getCollection(collectionName);
return collection.find().first();
}
}
public class MyService {
...
public Observable<Document> getFirstOf(String collectionName){
return Observable.just(myDao.getFirstDocument(collectionName));
}
}
相反,使用 MongoDB 的异步驱动程序,我的读取操作的返回类型是 void(而不是 Document 或 Future),其中包含回调方法,例如:
collection.find().first(
(document, throwable) -> {
myService.myCallback(document);
}
);
那么,如何将我的 Observable Documents 传递给 MyService?
public class MyDao{
...
public void getFirstDocument(String collectionName){
MongoCollection<Document> collection = database.getCollection(collectionName);
collection.find().first(
(document, throwable) -> {
//SOME SORT OF CALLBACK
}
);
}
}
public class MyService {
...
public Observable<Document> getFirstOf(String collectionName){
return ???????
}
}