1

我如何将MainEngineObservable 转换为 Cold ?从这个例子:

    public IObservable<int> MainEngine
    {
        get
        {
            Random rnd = new Random();
            int maxValue = rnd.Next(20);
            System.Diagnostics.Trace.TraceInformation("Max value is: " + maxValue.ToString());

            return (from sinlgeInt in Enumerable.Range(0, maxValue)
                    select sinlgeInt).ToObservable();
        }
    }

    public void Main()
    {
        // 1
        MainEngine.Subscribe(
                onNext: (item) => { System.Diagnostics.Trace.TraceInformation("Value is: " + item.ToString()); }
        );

        // 2
        MainEngine.Subscribe(
                onNext: (item) => { System.Diagnostics.Trace.TraceInformation("Gonna put it into XML: " + item.ToString()); }
        );
    }

问题 1:在订阅者 1 和订阅者 2 上,我得到不同的结果,但我希望他们都收到相同的结果。

问题2:从我添加第二个订阅者的时间点开始,他们都继续收到相同的结果。

4

2 回答 2

4

关于您的第一个问题,问题在于观察者没有订阅相同的IObservable内容,因为您两次调用吸气剂。

将 分配IObservable给局部变量似乎可以解决问题:

IObservable<int> mainEngine = MainEngine;

mainEngine.Subscribe(onNext: (item) => { /* ... */ });
mainEngine.Subscribe(onNext: (item) => { /* ... */ });  

关于你的第二个问题,如果你想分享一个单一的订阅IObservable,你可以使用以下Publish方法:

IConnectableObservable<int> published = MainEngine.Publish();

published.Subscribe(onNext: (item) => { Console.WriteLine(item + " on observer 1"); });
published.Subscribe(onNext: (item) => { Console.WriteLine(item + " on observer 2"); });

published.Connect();

然后,两个订阅者将以IObservable交错的方式查看结果:

0 on observer 1
0 on observer 2
1 on observer 1
1 on observer 2
etc.

您还可以在调用订阅后订阅新的观察者,之后所有订阅者将看到相同的事件。您可以通过在新线程上运行您的 observable 并引入延迟来修改您的示例来测试这一点:

public static void Main()
{
    Random rnd = new Random();
    int maxValue = rnd.Next(20);

    /* Zip with Observable.Interval to introduce a delay */
    IObservable<int> mainEngine = Observable.Range(0, maxValue, Scheduler.NewThread)
        .Zip(Observable.Interval(TimeSpan.FromMilliseconds(100)), (a, b) => a);

    /* Publish the observable to share a subscription between observers */
    IConnectableObservable<int> published = mainEngine.Publish();

    /* Subscribe the first observer immediately, events are not yet being observed */
    published.Subscribe(onNext: (item) => { Console.WriteLine(item + " on observer 1"); });

    /* Start pushing events to the first observer */
    published.Connect();

    /* Wait one second and then subscribe the second observer */
    Thread.Sleep(1000);
    published.Subscribe(onNext: (item) => { Console.WriteLine(item + " on observer 2"); });

    Console.ReadKey();
}

您将仅在第一个观察者上看到一秒钟的事件,然后两个观察者将同时看到每个事件。

于 2012-08-14T16:40:46.363 回答
3

你的 observable 已经很冷了。如果您每次订阅 observable 时都获取一个实例,那么您将获得相同的值。

它看起来很热的唯一方法是,如果你多次调用MainEngine你会得到不同的可观察实例。但这并不能使它们真正起来。

实际上,您已经创建了一个冷的可观察工厂

要使该MainEngine方法真正变热,您需要添加一个Defer调用,如下所示:

public IObservable<int> MainEngine
{
    get
    {
        return Observable.Defer(() =>
        {
            Random rnd = new Random();
            int maxValue = rnd.Next(20);

            System.Diagnostics.Trace.TraceInformation(
                "Max value is: " + maxValue.ToString());

            return Observable.Range(0, maxValue);
        });
    }
}

另请注意,我更改Enumerable.RangeObservable.Range并删除了对.ToObservable().

为了现在真正让它变热,这里是做什么的:

var hotObservable = MainEngine.Publish().RefCount();

这实际上意味着当您同时订阅多个 observable 时,它​​们将共享底层 observable。当没有订阅者时,底层的 observable 就会消失,只有在新的观察者订阅时才会创建。

请记住,您的MainEngine默认实现是使用运行的,Scheduler.Immediate因此在您将可观察对象更改为在不同的线程上运行之前,您不会看到此代码的好处。

我希望这有帮助。

于 2012-08-15T01:55:04.477 回答