2
 Observable
        .from(couchbaseDocuments)
        .subscribeOn(Schedulers.io())
        .flatMap(docToInsert->asyncBucket.insert(docToInsert))
        .retryWhen(RetryBuilder.anyOf(TemporaryFailureException.class).delay(Delay.exponential(TimeUnit.MILLISECONDS, 5)).max(3).build())
        .map(doc->convertToJava(JsonObject.fromJson(doc.content()),CouchbaseEntity.class).getId())
        .toBlocking()
        .forEach(id->insertedIds.add(id));

要求:

  1. 在特定类型的沙发底座中批量插入文档。
  2. 创建另一个文档,其中仅包含我们在第一步中插入的文档的 ID
  3. 如果第一步在我们停止插入的某个 id 处失败,则第二个文档应该只有那些在异常发生之前插入的 id
  4. 调用是同步的

我是 RxJava 的新手。我写了上面的反应代码,但似乎我还没有清楚地理解一些概念。我的想法是,最后的 forEach 将始终获取发出的项目,如果发生异常,我将捕获它,然后使用 insertIds 列表创建第二个文档。但是,该列表始终包含所有不满足我要求的 ID。

谁能解释一下代码有什么问题以及如何实现上述要求?

4

1 回答 1

2

这些retry方法将重新订阅上游Observable

在您的情况下,这意味着订阅couchbaseDocuments并可能尝试重新插入已成功插入的文档。

您可能更愿意重试失败的插入,而不是再次重试整个流:

 Observable
        .from(couchbaseDocuments)
        .subscribeOn(Schedulers.io())
        .flatMap(docToInsert->asyncBucket.insert(docToInsert).retryWhen(...))
        .map(doc->convertToJava(JsonObject.fromJson(doc.content()),CouchbaseEntity.class).getId())
        .toBlocking()
        .forEach(id->insertedIds.add(id));

本质上:你必须移动一个括号。

于 2018-07-23T15:40:44.793 回答