4

好的,试图理解 Rx,有点迷失在这里。

FromAsyncPattern 现在已被弃用,所以我从这里(使用 Rx 点亮任务部分)举了一个例子,它起作用了,我只是做了一些更改,不使用 await 只是等待可观察和订阅.....

我不明白的是为什么函数 SumSquareRoots 被称为两次?

 var res = Observable.FromAsync(ct => SumSquareRoots(x, ct))
                                      .Timeout(TimeSpan.FromSeconds(5));

            res.Subscribe(y => Console.WriteLine(y));

            res.Wait();


class Program
{
    static void Main(string[] args)
    {
        Samples();
    }

    static void Samples()
    {
        var x = 100000000;

        try
        {
            var res = Observable.FromAsync(ct => SumSquareRoots(x, ct))
                                      .Timeout(TimeSpan.FromSeconds(5));

            res.Subscribe(y => Console.WriteLine(y));

            res.Wait();
        }
        catch (TimeoutException)
        {
            Console.WriteLine("Timed out :-(");
        }
    }

    static Task<double> SumSquareRoots(long count, CancellationToken ct)
    {
        return Task.Run(() =>
        {
            var res = 0.0;
            Console.WriteLine("Why I'm called twice");
            for (long i = 0; i < count; i++)
            {
                res += Math.Sqrt(i);

                if (i % 10000 == 0 && ct.IsCancellationRequested)
                {
                    Console.WriteLine("Noticed cancellation!");
                    ct.ThrowIfCancellationRequested();
                }
            }

            return res;
        });
    }
}
4

1 回答 1

12

这是两次调用 SumSquareRoots 的原因是因为您订阅了两次:

// Subscribes to res
res.Subscribe(y => Console.WriteLine(y));

// Also Subscribes to res, since it *must* produce a result, even
// if that result is then discarded (i.e. Wait doesn't return IObservable)
res.Wait();

SubscribeforeachRx 的 - 就像如果你两次,你最终可能会做 2 倍的工作,多个foreachs意味着工作的倍数。要撤消此操作,您可以使用不会丢弃结果的阻塞调用:IEnumerableSubscribe

Console.WriteLine(res.First());

或者,您可以使用Publish“冻结”结果并将其回放给 > 1 个订阅者(有点像您ToArray在 LINQ 中使用的方式):

res = res.Publish();
res.Connect();

// Both subscriptions get the same result, SumSquareRoots is only called once
res.Subscribe(Console.WriteLine);
res.Wait();

您可以遵循的一般规则是,任何返回IObservable<T>Task<T>将导致 Subscription(*) 的Rx 方法

* - 技术上不正确。但是如果你这样想,你的大脑会感觉更好。

于 2012-11-26T22:30:30.177 回答