3

我有一个用例,我正在消费消息,保存它们然后回复成功或失败。mongo insert 返回一个 Observable,因此我可以使用 flatmap 进行链接。问题是 insert Observable 发出插入的结果,但我需要从第一个 observable 发出的原始 Message 发出来回复。因此,为了完成这项工作,我在第一个 Observable 的订阅中运行插入,并在第二个订阅中回复。

我希望使用像 flatmap 这样的操作符以一种更具反应性的方式来实现这一点。我搜索了运算符列表,但没有找到我要查找的内容。

eb.consumer("persister.save.event").toObservable()
    .subscribe(msg -> {
        mongo.insertObservable("event", (JsonObject) msg.body())
            .subscribe(
                res -> msg.reply(new JsonObject().put("success", true)),
                error -> msg.fail(500, "failed to save event"));
            });

上面的代码是应该完成的方式还是有更好的方法?两个订阅者感觉不对劲。

4

1 回答 1

2

以下是可以避免两个订阅者的方法:

eb.consumer("persister.save.event").toObservable()
    .flatMap(msg -> mongo.insertObservable("event", (JsonObject) msg.body()).map(mongoResponse -> msg))
    .subscribe(
            res -> msg.reply(new JsonObject().put("success", true)),
            error -> msg.fail(500, "failed to save event"));

诀窍是让map你的 mongo 结果msgflatMap.

于 2016-03-28T18:41:29.590 回答