4

我试图了解如何使用 rxcpp,我的印象是,当一个 observable 发出一个值时,所有订阅的观察者都会通过调用他们的 on_next() 方法得到通知,并将发出的值传递给他们。

以下示例并非如此:

auto eventloop = rxcpp::observe_on_event_loop();

printf("Start task\n");

auto values = rxcpp::observable<>::interval(std::chrono::seconds(2)).map(
        [](int i){
            printf("Observable sending: %d\n", i);
            return i;
        }
);

values.
    subscribe_on(eventloop).
    take(2).
    as_blocking().
    subscribe(
        [](int v){printf("#1 onNext: %d\n", v);},
        [](){printf("#1 onCompleted\n");});

values.
    subscribe_on(eventloop).
    take(2).
    as_blocking().
    subscribe(
        [](int v){printf("#2 onNext: %d\n", v);},
        [](){printf("#2 onCompleted\n");});

printf("Finish task\n");

我希望输出类似于:

Start task
Observable sending: 1
#1 onNext: 1
#2 onNext: 1
Observable sending: 2
#1 onNext: 2
#1 onCompleted
#2 onNext: 2
#2 onCompleted
Finish task

即当新值出现时,所有订阅的观察者都会调用 on_next。

相反,输出实际上是:

Start task
Observable sending: 1
#1 onNext: 1
Observable sending: 2
#1 onNext: 2
#1 onCompleted
Observable sending: 1
#2 onNext: 1
Observable sending: 2
#2 onNext: 2
#2 onCompleted
Finish task
4

1 回答 1

3

这是经典的冷热行为。

hot observable 会如你所愿。间隔是冷可观察的,因此每个订阅都会产生一组独立的值。

发布操作员将获取单个冷可观察对象并将其作为热可观察对象共享。

在这种情况下,它会。

auto sharedvalues = values.publish().ref_count();

然后在订阅表达式中使用sharedvalues而不是。values

搜索 hot vs cold observables 会发现对这个话题的广泛讨论。

于 2016-09-25T19:18:09.557 回答