9

我知道PipeTo,但有些东西,比如嵌套延续的同步等待,似乎违背了 async & await 方式。

所以,我的第一个问题 [1] 是:这里有什么“魔法”,这样我们就可以连续同步等待嵌套任务,最终它仍然是异步的?

当我们处于异步和等待差异时,如何处理失败?

让我们创建一个简单的示例:

public static class AsyncOperations
{
    public async static Task<int> CalculateAnswerAsync()
    {
        await Task.Delay(1000).ConfigureAwait(false);
        throw new InvalidOperationException("Testing!");
        //return 42;
    }

    public async static Task<string> ConvertAsync(int number)
    {
        await Task.Delay(600).ConfigureAwait(false);
        return number + " :)";
    }
}

在“常规”、异步和等待方式中:

var answer = await AsyncOperations.CalculateAnswerAsync();
var converted = await AsyncOperations.ConvertAsync(answer);

正如您所期望的那样,异常将从第一次操作中冒出来。

现在,让我们创建一个将处理这些异步操作的参与者。为了争论,假设CalculateAnswerAsyncandConvertAsync应该一个接一个地用作一个完整的操作(类似于,例如,StreamWriter.WriteLineAsync如果StreamWriter.FlushAsync您只想将一行写入流)。

public sealed class AsyncTestActor : ReceiveActor
{
    public sealed class Start
    {
    }

    public sealed class OperationResult
    {
        private readonly string message;

        public OperationResult(string message)
        {
            this.message = message;
        }

        public string Message
        {
            get { return message; }
        }
    }

    public AsyncTestActor()
    {
        Receive<Start>(msg =>
               {
                   AsyncOperations.CalculateAnswerAsync()
                     .ContinueWith(result =>
                            {
                                var number = result.Result;
                                var conversionTask = AsyncOperations.ConvertAsync(number);
                                conversionTask.Wait(1500);
                                return new OperationResult(conversionTask.Result);
                            })
                     .PipeTo(Self);
                });
        Receive<OperationResult>(msg => Console.WriteLine("Got " + msg.Message));
    }
}

如果没有例外,我仍然Got 42 :)没有任何问题,这让我回到了上面的“魔术”点 [1]。此外,示例中提供的AttachedToParentExecuteSynchronously标志是可选的,还是几乎需要它们才能使一切按预期工作?它们似乎对异常处理没有任何影响......

现在,如果CalculateAnswerAsync抛出异常,这意味着result.Resultthrows AggregateException,它几乎被吞没了。

如果可能的话,我应该在这里做什么,以使异步操作中的异常像“常规”异常一样使参与者崩溃?

4

2 回答 2

10

TPL 中错误处理的乐趣:)

一旦一个任务开始在它自己的线程上运行,它里面发生的一切都已经与调用者异步了——包括错误处理

  1. 当您开始您的第Task一个演员内部时,该任务独立于ThreadPool您的演员运行。这意味着你在里面所做的任何事情Task都已经与你的actor异步了——因为它在不同的线程上运行。这就是为什么我在Task.WaitPipeTo帖子顶部链接到的示例中进行了调用。对演员没有影响 - 它只是看起来像一个长期运行的任务。
  2. 异常 - 如果您的内部任务失败,该conversionTask.Result属性将抛出在其运行期间捕获的异常,因此您需要在您的内部添加一些错误处理,Task以确保您的演员收到出错的通知。请注意,我在这里做到了:https ://github.com/petabridge/akkadotnet-code-samples/blob/master/PipeTo/src/PipeTo.App/Actors/HttpDownloaderActor.cs#L117 - 如果您将异常转换为消息你的演员可以处理:鸟儿开始歌唱,彩虹闪耀,TPL 错误不再是痛苦和痛苦的来源。
  3. 至于抛出异常时会发生什么......

现在,如果 CalculateAnswerAsync 抛出异常,这意味着 result.Result 抛出 AggregateException,它几乎被吞没了。

AggregateException包含包含在其中的内部异常列表 - TPL 具有这种聚合错误概念的原因是在以下情况下:(a)您有一个任务是多个任务聚合的延续,即Task.WhenAll或(b)您有错误沿链传播ContinueWith回父级。您还可以调用该AggregateException.Flatten()调用以更轻松地管理嵌套异常。

TPL + Akka.NET 的最佳实践

处理来自 TPL 的异常是一件令人讨厌的事情,这是事实 - 但处理它的最佳方法是处理try..catch..您内部的异常Task并将它们转换为您的参与者可以处理的消息类。

此外,示例中提供的 AttachedToParent 和 ExecuteSynchronously 标志是可选的,还是几乎需要它们才能使一切按预期工作?

当您在延续上有延续时,这主要是一个问题 -PipeTo自动在其自身上使用这些标志。它对错误处理的影响为零,但确保您的延续立即在与原始Task.

我建议仅在您执行大量嵌套延续时才使用这些标志 - 一旦您深入超过 1 个延续,TPL 开始对如何安排您的任务采取一些自由(事实上,像 OnlyOnCompleted 这样的标志在更多之后停止被接受超过 1 个继续。)

于 2015-02-16T22:08:54.707 回答
7

只是为了补充亚伦所说的话。截至昨天,当使用任务调度器时,我们确实支持演员内部的安全异步等待。

public class AsyncAwaitActor : ReceiveActor
{
    public AsyncAwaitActor()
    {
        Receive<string>(async m =>
        {
            await Task.Delay(TimeSpan.FromSeconds(1));
            Sender.Tell("done");
        });
    }
}

public class AskerActor : ReceiveActor
{
    public AskerActor(ActorRef other)
    {
        Receive<string>(async m =>
        {
            var res = await other.Ask(m);
            Sender.Tell(res);
        });
    }
}

public class ActorAsyncAwaitSpec : AkkaSpec
{
    [Fact]
    public async Task Actors_should_be_able_to_async_await_ask_message_loop()
    {
        var actor = Sys.ActorOf(Props.Create<AsyncAwaitActor>()
        .WithDispatcher("akka.actor.task-dispatcher"),
            "Worker");
        //IMPORTANT: you must use the akka.actor.task-dispatcher
        //otherwise async await is not safe

        var asker = Sys.ActorOf(Props.Create(() => new AskerActor(actor))
        .WithDispatcher("akka.actor.task-dispatcher"),
            "Asker");

        var res = await asker.Ask("something");
        Assert.Equal("done", res);
    }
}

这不是我们的默认调度程序,因为它确实以性能/吞吐量为代价。如果您触发阻塞的任务(例如使用task.Wait()or task.Result),也存在死锁的风险。因此该PipeTo模式仍然是首选方法,因为它更适合于 Actor 模型。但是,如果您确实需要进行一些 TPL 集成,那么异步等待支持可以作为一个额外的工具。

此功能实际上PipeTo在幕后使用。它将接受每个任务的延续,并将其包装在一个特殊的消息中,并将该消息传递回参与者,并在参与者自己的并发上下文中执行该任务。

于 2015-02-17T05:52:30.640 回答