0

上下文使用 Couchbase 在 2 级文档存储上实现 REST CRUD 服务。数据模型是指向零个或多个项目文档的索引文档。索引文档使用异步获取作为 Observable 检索。接下来是一个 .flatMap() ,它为每个项目文档检索零个或多个 ID。异步获取返回一个 Observable,所以现在我创建的 Observable 是 Observable>。我想链接一个 .merge() 运算符,它将采用“发出 Observables 的 Observable,并将其输出合并到单个 Observable 的输出”来引用 ReactiveX 文档:) 然后我将 .subscribe() 到那个单一的可观察以检索项目文档。.merge() 运算符有很多签名,但我不知道如何在一系列运算符中使用它,如下所示:

bucket
.async()
.get(id)
.flatMap(
    document -> {

        JsonArray itemArray = (JsonArray) document.content().get("item");
        //  create Observable that gets and emits zero or more 
        // Observable<Observable<JsonDocument>> ie. bucket.async().gets
        Observable<Observable<JsonDocument>> items =
            Observable.create(observer -> {
                try {
                    if (!observer.isUnsubscribed()) {
                itemArray.forEach(
                    (jsonObject) -> {
                        String itemId = ((JsonObject)jsonObject).get("itemid").toString();
                        observer.onNext( 
                            bucket.async().get(itemId)
                        );
                    }
                    }
                );
                        observer.onCompleted();
                    }
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
        );          
        return items;
    },
    throwable -> {
        //  error handling omitted...
        return Observable.empty();
    },
    () -> {
        //  on complete processing omitted...
        return null;
    }
)
.merge( ???????? )
.subscribe( 
    nextItem -> {
        //  do something with each item document...
    },
    throwable -> {
        //  error handling omitted...
    },
    () -> {
         //  do something else...
    }
);

编辑:

你可能猜到我是一个被动的新手。@akarnokd 的回答帮助我意识到我想做的事情是愚蠢的。Observable<Observable<JsonDocument>>解决方案是合并闭包内项目的排放document并返回结果。这会发出以下JsonDocuments结果flatMap

bucket
.async()
.get(id)
.flatMap(
    document -> {

        JsonArray itemArray = (JsonArray) document.content().get("item");
        //  create Observable that gets and emits zero or more 
        // Observable<Observable<JsonDocument>> ie. bucket.async().gets
        Observable<Observable<JsonDocument>> items =
            Observable.create(observer -> {
                try {
                    if (!observer.isUnsubscribed()) {
                itemArray.forEach(
                    (jsonObject) -> {
                        String itemId = ((JsonObject)jsonObject).get("itemid").toString();
                        observer.onNext( 
                            bucket.async().get(itemId)
                        );
                    }
                    }
                );
                        observer.onCompleted();
                    }
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
        );          
        return Observable.merge(items);
    },
    throwable -> {
        //  error handling omitted...
        return Observable.empty();
    },
    () -> {
        //  on complete processing omitted...
        return null;
    }
)
.subscribe( 
    nextItem -> {
        //  do something with each item document...
    },
    throwable -> {
        //  error handling omitted...
    },
    () -> {
         //  do something else...
    }
);

测试和工作:)

4

2 回答 2

0

由于 Java 的表达限制,我们不能有一个merge()可以应用于Observble<Observable<T>>. 它需要扩展方法,例如 C# 中的方法。

下一个最好的事情是做一个身份flatMap

// ...
.flatMap(document -> ...)
.flatMap(v -> v)
.subscribe(...)
于 2015-10-21T14:03:37.483 回答
0

您可以调用toList()将所有发出的项目收集到一个列表中。我还没有测试过,但是像这样的东西呢:

bucket.async()
  .get(id)
  .flatmap(document -> { return Observable.from((JsonArray)document.content().get("item")})
  .flatMap(bucket::get)
  .toList()
  .subscribe(results -> /* list of documents */);
于 2015-10-21T22:34:24.700 回答