0

我在使用 Couchbase Java 客户端 2.2.2 和 Rx Observables 1.0.15 执行以下操作时遇到问题:

  • 我有一个字符串列表,它们是文档名称
  • 除了每个文档名称的原始文档外,我还想加载另一个文档(从原始文档名称推导出来),这样我会得到一对文档。如果这两个文档中的任何一个不存在,请不要再使用这对文档。
  • 如果该对有效(即两个文档都存在),则使用这两个文档从它们创建自定义对象
  • 将这些转换后的项目组合成一个列表

到目前为止我想出的东西看起来真的意味着:

List<E> resultList = new ArrayList<>();

Observable
    .from(originalDocumentNames)
    .flatmap(key -> {
        Observable firstDocument = bucket.async().get(key);
        Observable secondDocument = bucket.async().get(getSecondKeyNameFrom(key));
        return Observable.merge(firstDocument, secondDocument);
    })
    .reduce((jsonDocument1, jsonDocument2) -> {
        if (jsonDocument1 == null || jsonDocument2 == null) {
            return null;
        }
        resultList.add(createCustomObject(jsonDocument1, jsonDocument2);
        return null;
    })
    .filter(Objects.nonNull)
    .singleOrDefault(null)
    .subscribe(new Subscriber<E>() {
        public void onComplete() {
            //use resultList in a callback function
        }
    });

这不起作用。我不知道在哪里,但我认为我使用Observable.merge了错误的方式。另外,我认为我以错误的方式解决了整个问题。

因此,主要问题似乎是:

  • 如何向 Observable 流发出附加项目?
  • 如何将两个项目减少为另一种类型的项目?(reduce(T, T, T) 不允许这样做)
  • 我拿错了吗?
4

2 回答 2

4

您可以zip 平面图中使用。ObservableZip 将发出与最少项目一样多的项目。因此,如果缺少其中一个文档,则其序列将为空,并且 zip 将跳过它。

Observable
.from(originalDocumentNames)
.flatmap(key -> {
    //the stream of 0-1 original document
    Observable firstDocument = bucket.async().get(key);
    //the stream of 0-1 associated document
    Observable secondDocument = bucket.async().get(getSecondKeyNameFrom(key));

    //using zip and the createCustomObject method reference as a zip function to combine pairs of documents
    return Observable.zip(firstDocument, secondDocument, this::createCustomObject);
})
.toList() //let RxJava aggregate into a List
.subscribe(
    //the "callback" function, onNext will be called only once with toList
    list -> doSomething(list), 
    //always try to define onError (best practice)
    e -> processErrors(e)
);
于 2015-12-14T11:08:38.530 回答
1

这段代码有几个问题:

  1. 副作用,reduce操作是添加到Observable链外的列表,这是错误的。reduce应该返回列表或根本不存在,因为 Rx 有一个操作toList。也因为返回null下一个操作的reduce操作必须处理它;这是相当不雅的。

  2. merge操作错误,您应该改为zipflatmap并构建对/聚合。

  3. 可选点:如果任一 get 操作将返回多个项目,则平面图操作将无法处理(也许事实上couchbase就是这种情况)

注意我没有 IDE,所以暂时没有代码。merge但在我看来,替换zip和删除reduce肯定会有所帮助。

于 2015-12-14T11:20:14.123 回答