我有以下代码片段,它枚举了一些 xml 的元素(从svn log --xml ...
进程的输出中读取),然后为每个 xml 元素运行一个长时间运行的方法。
var proc = Process.Start(svnProcInfo);
var xml = XDocument.Load(proc.StandardOutput);
var xElements = xml.Descendants("path")
.ToObservable()
//.SubscribeOn(ThreadPoolScheduler.Instance)
.Select(descendant => return LongRunning(descendant));
xElements
//.SubscribeOn(NewThreadScheduler.Default)
.Subscribe(result => Console.WriteLine(result);
Console.ReadKey();
该LongRunning
方法并不重要,但在其中我记录了它运行的线程。让我们假设它运行了一整秒。
我的问题是,取消注释任何SubscribeOn()
一行都没有任何效果。调用LongRunning
是连续的,并且每秒发生一次,在同一个线程上(尽管与主(初始)线程不同的线程)。
这是一个控制台应用程序。
我是 Rx 的新手。我错过了什么?
编辑:
在尝试了 Lee Campbell 的回答后,我注意到了另一个问题。
Console.Error.WriteLine("Main thread " + Thread.CurrentThread.ManagedThreadId);
var xElements = xml.Descendants("path").ToObservable()
//.ObserveOn(Scheduler.CurrentThread)
.SelectMany(descendant =>
Observable.Start(()=>LongRunning(descendant),NewThreadScheduler.Default))
.Subscribe(result => Console.WriteLine(
"Result on: " + Thread.CurrentThread.ManagedThreadId));
[...]
string LongRunning(XElement el)
{
Console.WriteLine("Execute on: Thread " + Thread.CurrentThread.ManagedThreadId);
DoWork();
Console.WriteLine("Finished on Thread " + Thread.CurrentThread.ManagedThreadId);
return "something";
}
这给出了以下输出:
Main thread 1
Execute on: Thread 3
Execute on: Thread 4
Execute on: Thread 5
Execute on: Thread 6
Execute on: Thread 7
Finished on Thread 5
Finished on Thread 6
Result on: 5
Result on: 6
Finished on Thread 7
Result on: 7
Finished on Thread 3
Result on: 3
Finished on Thread 4
Result on: 4
Done! Press any key...
我需要的是一种将结果“排队”到同一个线程的方法。我认为这就是ObserveOn()
目的,但取消注释ObserveOn()
上面的行并不会改变结果。