11

我目前正试图用 RX .NET 来解决并发问题,并被某些东西弄糊涂了。我想并行运行四个相对较慢的任务,所以我认为NewThreadScheduler.Default这是要走的路,因为它“代表了一个将每个工作单元安排在单独线程上的对象”。.

这是我的设置代码:

    static void Test()
    {
        Console.WriteLine("Starting. Thread {0}", Thread.CurrentThread.ManagedThreadId);

        var query = Enumerable.Range(1, 4);
        var obsQuery = query.ToObservable(NewThreadScheduler.Default);
        obsQuery.Subscribe(DoWork, Done);

        Console.WriteLine("Last line. Thread {0}", Thread.CurrentThread.ManagedThreadId);
    }

    static void DoWork(int i)
    {
        Thread.Sleep(500);
        Console.WriteLine("{0} Thread {1}", i, Thread.CurrentThread.ManagedThreadId);
    }

    static void Done()
    {
        Console.WriteLine("Done. Thread {0}", Thread.CurrentThread.ManagedThreadId);
    }

我假设“X Thread Y”每次都会输出不同的线程 id,但实际输出是:

Starting. Thread 1
Last line. Thread 1
1 Thread 3
2 Thread 3
3 Thread 3
4 Thread 3
Done. Thread 3

所有的工作都是按顺序在同一个新线程上完成的,这不是我所期望的。

我假设我错过了一些东西,但我不知道是什么。

4

1 回答 1

13

可观察查询有两个部分,它Query本身和Subscription. (这也是 ObserveOn 和 SubscribeOn 运算符之间的区别。)

Query的是

Enumerable
    .Range(1, 4)
    .ToObservable(NewThreadScheduler.Default);

这会创建一个 observable,它会在该系统的默认值上生成值NewThreadScheduler

您的订阅是

obsQuery.Subscribe(DoWork, Done);

这将DoWork针对由 生成的每个值运行,Query并且Done在调用Query结束时运行OnComplete。我认为对于将调用订阅方法中的函数的线程没有任何保证,实际上,如果查询的所有值都是在将运行订阅的线程的同一线程上生成的。看来他们也在这样做,所以所有订阅调用都是在同一个线程上进行的,这很可能是为了消除许多常见的多线程错误。

所以你有两个问题,一个是你的日志记录,如果你改变你Query

Enumerable
    .Range(1, 4)
    .Do(x => Console.WriteLine("Query Value {0} produced on Thread {1}", x, Thread.CurrentThread.ManagedThreadId);
    .ToObservable(NewThreadScheduler.Default);

您将看到在新线程上生成的每个值。

另一个问题是 Rx 的意图和设计之一。这一个Query长期运行的过程,Subscription是一个处理结果的简短方法。如果您想将长时间运行的函数作为 Rx Observable 运行,您最好的选择是使用Observable.ToAsync

于 2013-07-21T18:53:16.233 回答