2

使用 RxJava,我有一个源 Observable,它发出许多我希望与另一个发出相同类型的 Observable 相交的项目。在研究了许多选项之后,似乎最连贯的方式来构建事物是这样的:

Observable<String> source = ...emits 20 items

Observable.create(subscriber -> {
    source
        .buffer(5)
        .subscribe(things -> {
            tocheck.getMatches(things) //emits 3 matches
                .subscribe(subscriber::onNext, subscriber::onError, () -> {});
        }, subscriber::onError, subscriber::onCompleted));

这里的预期输出是,当我订阅生成的 Observable 时,会发出 12 个项目。由于 getMatches 的合同,我需要缓冲结果。

从表面上看,这似乎可行,但似乎不是最干净的方式。过滤器似乎不适用于此处,因为出于性能原因,我无法对每个项目运行相交检查。我玩弄了使用 flatMap 但 getMatches observable 完成了流,而不是来自源 observable 的完成通知。

有没有更好的方法来构建它?

编辑:澄清这种代码风格发生了什么:

Observable<String> source = ...emits 20 items

source
    .buffer(5)
    .flatMap(this::getMatches);  //final observable would emit a total of 12 items

这显然更干净,但是当我添加一些日志记录时(假设数据大小与原始片段相同:

source
    .doOnEach(notification -> {
        log.trace("Processing {}", notification.getValue());
    })
    .buffer(5)
    .flatMap(this::getMatches)
    .doOnEach(notification -> {
        log.trace("Processing after match {}", notification.getValue());
    });

我得到了 20 个“处理”日志实例,然后奇怪的是“处理之后”只有几条日志行(当我期望 12 时)。它似乎比应有的时间更早地调用了完成。也许我的结构有问题?

4

1 回答 1

1

所以看起来 AndroidEx 是当场的。我正在使用 Redis Lettuce 反应式 API,但它的行为似乎不正常。上面添加的代码片段是构造两个 Observable 交集的正确方法。

于 2016-01-23T23:23:43.057 回答