2

我正在查看 Akka.net faulttolerance 并设置了一个简单的示例,其中 Actor1 告诉 Actor2 一条消息。Actor2 抛出异常。Actor1 有一个 SuperVisorStrategy 告诉失败的actor恢复。

我实际上预计该消息会再次传递给 Actor2。但事实并非如此。所以 Actor2 恢复并可以继续处理新消息。但是现在导致 Actor 失败的消息已经消失了。这应该如何处理?我不想丢失导致异常的消息。我希望 Actor2 再次处理该消息。

namespace ConsoleApplication
{
    class Program
    {
        static void Main(string[] args)
        {
            using (ActorSystem actorSystem = ActorSystem.Create("test"))
            {
                IActorRef customer = actorSystem.ActorOf(Props.Create<Actor1>(), "actor1");

                customer.Tell(new Start());
                Console.Read();
            }
        }
    }

    public class Actor1 : UntypedActor
    {
        protected override SupervisorStrategy SupervisorStrategy()
        {
            return new OneForOneStrategy(3, TimeSpan.FromSeconds(5), ex =>
            {
                if (ex is ApplicationException)
                    return Directive.Resume;

                return Directive.Escalate;
            });
        }

        protected override void OnReceive(object message)
        {
            if (message is Start)
            {
                IActorRef actor2Ref = Context.ActorOf<Actor2>("actor2");
                Context.Watch(actor2Ref);

                actor2Ref.Tell(new DoSomething());
            }

            else if (message is Response)
            {
                Console.WriteLine("Response received");
                return;
            }
            else if (message is Terminated)
            {
                Console.WriteLine("Terminated");
            }
        }
    }

    public class Actor2 : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            if (message is DoSomething)
            {
                // only called once.
                throw new ApplicationException("testexception");
            }
        }
    }

    public class Start
    {
    }

    public class DoSomething
    {
    }

    public class Response
    {
    }
}
4

1 回答 1

2

通常,您不太可能希望连续多次重新处理相同的易出错消息。如果它导致异常,在同一毫秒内一次又一次地处理它可能会带来完全相同的结果。

可能想要实现的是尝试在延迟后重新处理它,即因为您正在尝试重新连接到外部服务。在这种情况下,您可能希望将其包装到另一条消息中并在 ActorSystem 调度程序中安排重新发送。基本示例:

sealed class Retry
{
    public readonly object Message;
    public readonly int Ttl;

    public Retry(object message, int ttl)
    {
        Message = message;
        Ttl = ttl;
    }
}

class MyActor : ReceiveActor 
{
    ...

    protected override void PreRestart(Exception reason, object message)
    {
        Retry oldRetry;
        var retry = (oldRetry = message as Retry) != null 
            ? new Retry(oldRetry.message, oldRetry.Ttl - 1)
            : new Retry(message, retryCount);

        if (retry.Ttl > 0)
            Context.System.Scheduler.ScheduleTellOnce(delay, Self, retry, Sender);

        base.PreRestart(reason, message);
    }
}

根据您的需要,其他一些情况也可能涉及使用CircuitBreakers或更可靠的交付语义 - 默认情况下,Akka.NET 提供最多一次交付语义,您可以使用 Akka.Persistence 插件中的 AtLeastOnceDelivery 组件更改此设置。

于 2015-12-22T14:11:25.317 回答