我正在使用来自 NuGet 的最新 Rx2。在下面的示例代码中,订阅是在新线程上处理的,不会阻止新值的生成。这都很好。但是,为什么 observable 生成的值会在新线程上按顺序处理?
static void Main(string[] args)
{
printThreadId("Started");
var observable = Observable.Generate<int, int>(
0,
x => {
printThreadId("Comparing " + x);
return x < 5; },
x => {
printThreadId("Incrementing " + x);
return x + 1; },
x => {
printThreadId("Selecting " + x);
return x; },
NewThreadScheduler.Default
);
var disp = observable
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(state =>
{
printThreadId("Processing " + state);
Thread.Sleep(1000);
});
Console.ReadLine();
disp.Dispose();
}
static Action<string> printThreadId = (prefix) => Console.WriteLine("{0} on [Worker.{1}]", prefix, Thread.CurrentThread.ManagedThreadId);
上面的代码大约需要 5 秒才能完成。我希望每个动作都在一个新线程上执行,但这并没有发生。
下面的代码安排了一个新任务,整个过程在 1 秒多一点的时间内完成。
var disp = observable
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(state =>
{
printThreadId("Processing " + state);
NewThreadScheduler.Default.Schedule(() =>
{
printThreadId("Long running task " + state);
Thread.Sleep(1000);
});
});
有没有更好的方法来安排长时间运行的任务同时使用ObserveOn
and运行Subscribe
?