Observable
.from(couchbaseDocuments)
.subscribeOn(Schedulers.io())
.flatMap(docToInsert->asyncBucket.insert(docToInsert))
.retryWhen(RetryBuilder.anyOf(TemporaryFailureException.class).delay(Delay.exponential(TimeUnit.MILLISECONDS, 5)).max(3).build())
.map(doc->convertToJava(JsonObject.fromJson(doc.content()),CouchbaseEntity.class).getId())
.toBlocking()
.forEach(id->insertedIds.add(id));
要求:
- 在特定类型的沙发底座中批量插入文档。
- 创建另一个文档,其中仅包含我们在第一步中插入的文档的 ID
- 如果第一步在我们停止插入的某个 id 处失败,则第二个文档应该只有那些在异常发生之前插入的 id
- 调用是同步的
我是 RxJava 的新手。我写了上面的反应代码,但似乎我还没有清楚地理解一些概念。我的想法是,最后的 forEach 将始终获取发出的项目,如果发生异常,我将捕获它,然后使用 insertIds 列表创建第二个文档。但是,该列表始终包含所有不满足我要求的 ID。
谁能解释一下代码有什么问题以及如何实现上述要求?