我在使用 Couchbase Java 客户端 2.2.2 和 Rx Observables 1.0.15 执行以下操作时遇到问题:
- 我有一个字符串列表,它们是文档名称
- 除了每个文档名称的原始文档外,我还想加载另一个文档(从原始文档名称推导出来),这样我会得到一对文档。如果这两个文档中的任何一个不存在,请不要再使用这对文档。
- 如果该对有效(即两个文档都存在),则使用这两个文档从它们创建自定义对象
- 将这些转换后的项目组合成一个列表
到目前为止我想出的东西看起来真的意味着:
List<E> resultList = new ArrayList<>();
Observable
.from(originalDocumentNames)
.flatmap(key -> {
Observable firstDocument = bucket.async().get(key);
Observable secondDocument = bucket.async().get(getSecondKeyNameFrom(key));
return Observable.merge(firstDocument, secondDocument);
})
.reduce((jsonDocument1, jsonDocument2) -> {
if (jsonDocument1 == null || jsonDocument2 == null) {
return null;
}
resultList.add(createCustomObject(jsonDocument1, jsonDocument2);
return null;
})
.filter(Objects.nonNull)
.singleOrDefault(null)
.subscribe(new Subscriber<E>() {
public void onComplete() {
//use resultList in a callback function
}
});
这不起作用。我不知道在哪里,但我认为我使用Observable.merge
了错误的方式。另外,我认为我以错误的方式解决了整个问题。
因此,主要问题似乎是:
- 如何向 Observable 流发出附加项目?
- 如何将两个项目减少为另一种类型的项目?(reduce(T, T, T) 不允许这样做)
- 我拿错了吗?