4

你好,我已经尝试了 101 个 Rx 示例之一:

    static IEnumerable<int> GenerateAlternatingFastAndSlowEvents()
    {
        int i = 0;

        while (true)
        {
            if (i > 1000)
            {
                yield break;
            }
            yield return i;
            Thread.Sleep(i++ % 10 < 5 ? 500 : 1000);
        }
    }

    private static void Main()
    {
        var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
        var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));

        using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }

我不明白为什么“按任意键取消订阅”这一行从不显示。我的理解是订阅是异步的,您订阅它并立即返回。我错过了什么导致我的主线程阻塞?

4

2 回答 2

7

阻塞是由您的可枚举循环while (true)IEnumerable<T>.ToObservable()默认为CurrentThreadScheduler.

如果您提供Scheduler.TaskPool(或Scheduler.ThreadPool在 .NET 4 之前的版本中)重载ToObservable,您应该会看到您期望的行为(尽管它不会在主线程上调用您的订阅者,仅供参考)。

话虽如此,我想你会发现你的组合Thread.Sleep并且Throttle会按你的预期工作。您最好创建一个使用调度程序来安排延迟的自定义 observable。

于 2011-02-25T10:35:47.967 回答
2

我同意理查德的观点。

的实现.ToObservable()如下所示:

public static IObservable<TSource> ToObservable<TSource>(
    this IEnumerable<TSource> source)
{
    if (source == null)
    {
        throw new ArgumentNullException("source");
    }
    return source.ToObservable<TSource>(Scheduler.CurrentThread);
}

它使用 and 调用.ToObservable(IScheduler)重载,Scheduler.CurrentThread因为您正在使用.Sleep(...)导致可观察对象必须完成的延迟,然后代码才能超出该.Subscribe(...)方法。想想如果这段代码都在一个线程上运行(它就是这样),它的行为会是什么样子。

为了解决这个问题,您可以按照 Richard 的建议使用任务池或线程池调度程序,但我认为您的代码存在更基本的问题。那就是您正在使用“老派”线程睡眠,而不是依赖 Rx 方法。

试试这个来生成你的 observable:

var observable =
    Observable
        .GenerateWithTime(0, i => i <= 1000, i => i + 1,
            i => i, i => TimeSpan.FromMilliseconds(i % 10 < 5 ? 500 : 1000))
        .Timestamp();

GenerateWithTime(...)完成您的GenerateAlternatingFastAndSlowEvents方法所做的所有事情,但它直接创建可观察对象并使用Scheduler.ThreadPool底层进行,因此您无需指定任何调度程序。

于 2011-02-25T10:48:40.743 回答