0

我正在研究使用 RxJava2 进行反应式编程,并且我对它与 MongoDB 等异步数据库驱动程序的使用有疑问。

如果我使用阻塞 MongoDB 驱动程序来获取集合,则方法是这样的:

public class MyDao{
   ...
   public Document getFirstDocument(String collectionName){
      MongoCollection<Document> collection = database.getCollection(collectionName);
      return collection.find().first();
   }
}



public class MyService {
   ...
   public Observable<Document> getFirstOf(String collectionName){
       return Observable.just(myDao.getFirstDocument(collectionName)); 
   }
}

相反,使用 MongoDB 的异步驱动程序,我的读取操作的返回类型是 void(而不是 Document 或 Future),其中包含回调方法,例如:

collection.find().first(
        (document, throwable) -> {
            myService.myCallback(document);
        }
);

那么,如何将我的 Observable Documents 传递给 MyService?

public class MyDao{
   ...
   public void getFirstDocument(String collectionName){
      MongoCollection<Document> collection = database.getCollection(collectionName);
      collection.find().first(
        (document, throwable) -> {
            //SOME SORT OF CALLBACK
        }
     );
   }
}



public class MyService {
   ...
   public Observable<Document> getFirstOf(String collectionName){
       return ??????? 
   }
}
4

1 回答 1

1

当你在Observable.just()使用

public Observable<Document> getFirstOf(String collectionName){
    return Observable.just(myDao.getFirstDocument(collectionName)); 
}

它等于下一个代码

public Observable<Document> getFirstOf(String collectionName){
    Document doc = myDao.getFirstDocument(collectionName);
    return Observable.just(doc); 
}

您会注意到它不是async代码,并且对 DB 的请求是在调用线程上执行的。要制作该代码,async您需要像这样重写它

public Observable<Document> getFirstOf(String collectionName){
    return Observable.fromCallable(() -> myDao.getFirstDocument(collectionName)); 
}

如果您正在使用asyncMongoDB 驱动程序并希望将其包装在其中Observable,则可以这样编写

public Observable<Document> getFirstDocument(String collectionName) {
    return Observable.create(emitter -> {
        MongoCollection<Document> collection = database.getCollection(collectionName);
        collection.find().first((document, throwable) -> {
            if(document != null) {
                emitter.onNext(document);
                emitter.onComplete();
            } else if(throwable != null) {
                emitter.onError(throwable);
            }
        });
    });
}
于 2018-11-02T20:22:28.863 回答