2

我正在学习响应式扩展,这些天我遇到了这种情况,代码在这里:

    class Program
{
    private static void Main(string[] args)
    {
        var ls = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9 }.ToObservable();
        ls.Select(m => new
            {
                t = Observable.Start(() =>
                    {
                        Thread.Sleep(100);
                        return new Random().Next(3, 20);
                    }),
                i = m
            }).Subscribe(item => item.t.Subscribe(Console.WriteLine));
        Task.WaitAll();
        Console.WriteLine("all done");
        Console.ReadKey();
    }
}

它表明 Observable 中有一个 IObservable,我想在所有过程完成后打印“all done”,但这不起作用。程序启动后“全部完成”打印得非常快,不再等待,在我这里的情况下,我应该怎么做才能获得真正的WaitAll?

4

2 回答 2

2

这并不是 Rx 的工作原理。这里Task.WaitAll()和您的 Rx 代码之间没有链接。您甚至没有将任何任务传递给该WaitAll()方法;-)

因此,首先,该Subscribe方法是非阻塞的。它只是说明此时我想开始使用这个序列,这就是在向我发送值/错误/完成通知时要做的事情。

您的嵌套 Observable 序列是一个相当高级的主题,可以直接进入,但没关系,我们可以使用它。

class Program
{
    private static void Main(string[] args)
    {
        //Let go, we are not IEnumerable any more :-)
        var ls = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9 }.ToObservable();

        var subscription = ls.Select(_ =>
                Observable.Start(() =>
                    {
                        Thread.Sleep(100);
                        return new Random().Next(3, 20);
                    })
             })
            .Merge() //Merge the IO<IO<T>> into Io<T> so we get a single completion.
            .Subscribe(
                item => item.Subscribe(Console.WriteLine),
                ()=>Console.WriteLine("all done"));


        Console.ReadKey();
        subscription.Dispose();
    }
}

您可以通过将Observable.Start+替换Thread.Sleep为 Rx 方法(如Observable.TimerScheduler.

这里要带走的关键是 Rx 是异步的。关键是不要阻止。这段代码中唯一阻塞的是Thread.SleepConsole.ReadKey()。理想情况下,如上所述,您Thread.Sleep无论如何都会更换。

于 2013-09-17T21:40:36.327 回答
2

您似乎在这里进行了一些“非 Rx”编码。您尝试执行的任务实际上非常简单。

首先,您有一些代码在完成一些工作后会产生值。我已将其重新编码为:

var rnd = new Random();
Func<int> produceValue = () =>
{
    Thread.Sleep(100);
    return rnd.Next(3, 20);
};

这使它很好地与 Rx 代码分开。作为旁注,我已将new Random()声明拉到函数之外,因为继续实例化新Random实例是不正确的 - 你不一定会以这种方式获得随机数。您还应该实例化一次并使用相同的实例。

所以现在产生 observable 的代码是直截了当的:

var query =
    from n in Observable.Range(1, 9)
    from m in Observable.Start(produceValue)
    select m;

订阅它也很容易:

query.Subscribe(
    Console.WriteLine,
    () => Console.WriteLine("All Done."));

我认为这正是您尝试编写的代码而没有任何讨厌的WaitFor代码。

于 2013-09-18T04:05:52.817 回答