1

我有一种情况,我需要使用自定义调度程序来运行任务(这些必须是任务)并且调度程序没有设置同步上下文(所以我收集了 no ObserveOn,SubscribeOn等)。SynchronizationContextScheduler以下是我最终如何做到的。现在,我想知道,我不确定这是否是进行异步调用并等待其结果的最合适方式。这是可以的还是有更健壮或惯用的方式?

var orleansScheduler = TaskScheduler.Current;
var someObservable = ...;
someObservable.Subscribe(i =>
{
    Task.Factory.StartNew(async () =>
    {
        return await AsynchronousOperation(i);
    }, CancellationToken.None, TaskCreationOptions.None, orleansScheduler);          
});

如果不需要等待怎么办?

<编辑:我找到了一个具体的简化示例来说明我在这里所做的事情。基本上我在奥尔良使用 Rx,上面的代码是我要做的简单说明。虽然我也对这种情况感兴趣。

最终代码 事实证明,这在奥尔良语境中有点棘手。我不知道如何才能使用ObserveOn,这正是我想要使用的东西。问题是通过使用它,Subscribe永远不会被调用。编码:

var orleansScheduler = TaskScheduler.Current;
var factory = new TaskFactory(orleansScheduler);
var rxScheduler = new TaskPoolScheduler(factory);
var someObservable = ...;
someObservable
//.ObserveOn(rxScheduler) This doesn't look like useful since...
.SelectMany(i =>
{
    //... we need to set the custom scheduler here explicitly anyway.
    //See Async SelectMany at http://log.paulbetts.org/rx-and-await-some-notes/.
    //Doing the "shorthand" form of .SelectMany(async... would call Task.Run, which
    //in turn runs always on .NET ThreadPool and not on Orleans scheduler and hence
    //the following .Subscribe wouldn't be called. 
    return Task.Factory.StartNew(async () =>
    { 
       //In reality this is an asynchronous grain call. Doing the "shorthand way"
       //(and optionally using ObserveOn) would get the grain called, but not the
       //following .Subscribe. 
       return await AsynchronousOperation(i);
    }, CancellationToken.None, TaskCreationOptions.None, orleansScheduler).Unwrap().ToObservable();
})
.Subscribe(i =>
{
    Trace.WriteLine(i);
});

此外,还有Codeplex Orleans 论坛上相关主题的链接。

4

2 回答 2

4

我强烈建议不要StartNew使用任何现代代码。它确实有一个用例,但非常罕见。

如果您必须使用自定义任务调度程序,我建议使用ObserveOn由调度程序周围的包装器TaskPoolScheduler构造的。TaskFactory这是一口,所以这是一般的想法:

var factory = new TaskFactory(customScheduler);
var rxScheduler = new TaskPoolScheduler(factory);
someObservable.ObserveOn(rxScheduler)...

然后,您可以SelectMany在源流中的每个事件到达时为它们启动异步操作。

另一种不太理想的解决方案是async void用于您的订阅“事件”。这是可以接受的,但您必须注意错误处理。作为一般规则,不允许异常从 async void 方法传播出去。

还有第三种选择,您可以将 observable 挂接到 TPL 数据流块中。像这样的块ActionBlock可以指定其任务调度程序,Dataflow 自然理解异步处理程序。请注意,默认情况下,Dataflow 块将一次将处理限制为单个元素。

于 2014-11-11T21:53:45.333 回答
3

一般来说,与订阅执行相比,将任务参数投影到任务执行中并仅订阅结果会更好/更惯用。这样您就可以在下游进一步使用 Rx 进行组合。

例如,给定一个随机任务,例如:

static async Task<int> DoubleAsync(int i, Random random)
{
    Console.WriteLine("Started");
    await Task.Delay(TimeSpan.FromSeconds(random.Next(10) + 1));
    return i * 2;
}

然后你可能会这样做:

void Main()
{
    var random = new Random();

    // stream of task parameters
    var source = Observable.Range(1, 5);

    // project the task parameters into the task execution, collect and flatten results
    source.SelectMany(i => DoubleAsync(i, random))

          // subscribe just for results, which turn up as they are done
          // gives you flexibility to continue the rx chain here
          .Subscribe(result => Console.WriteLine(result),
                    () => Console.WriteLine("All done."));
}
于 2014-11-11T21:47:09.837 回答