观察 Terminated 消息将提醒系统远程参与者已经死亡,但有一个问题是如何准确地响应终止的远程参与者。假设一个actor通过其构造函数获得了一个远程actor的IActorRef,当它再次活跃时,该actor如何获取远程actor的一个新的IActorRef。一种方法是让actor失败并委托给父actor,然后父actor将通过actor选择获得一个新的IActorRef给远程actor。然而,这样做的问题是,远程参与者的原始参与者选择可能发生在组合根中的非参与者代码中,通常会发生依赖注入。我想你可以通过传递一个演员选择工厂委托来解决这个问题,该委托可用于重建远程 IActorRef。
此类在构造函数中获取远程(或本地)参与者的路径,并定期执行参与者选择以获取远程参与者的刷新 IActorRef。这样,如果由于某种原因远程参与者死亡,对 FaultTolerantActorRef 的调用将在远程参与者死亡时以死信告终。然而,当远程参与者最终再次上线时,对 FaultTolerantActorRef 的调用最终将到达新恢复的远程参与者,而无需对调用本地参与者采取任何显式操作。
有一个 Invalidate 方法将强制 FaultTolerantActorRef 在下一次调用时进行新的演员选择。这可能由一个actor调用,以响应来自远程actor的 Terminated 消息。即使不调用 Invalidate,也会根据传递给构造函数的刷新间隔进行新的 Actor 选择。
using Akka.Actor;
using System;
using Akka.Util;
using System.Threading;
namespace JA.AkkaCore
{
public class FaultTolerantActorRef : IActorRef
{
public IActorRef ActorRef
{
get
{
if (!_valid || DateTime.Now.Ticks > Interlocked.Read(ref _nextRefreshTime))
RefreshActorRef();
return _actorRef;
}
}
public ActorPath Path
{
get
{
return ActorRef.Path;
}
}
object _lock = new object();
IActorRef _actorRef;
volatile bool _valid;
string _path;
IActorRefFactory _actorSystem;
private TimeSpan _requestTimeout;
private TimeSpan _refreshInterval;
//private DateTime _nextRefreshTime = DateTime.MinValue;
private long _nextRefreshTime = DateTime.MinValue.Ticks;
public FaultTolerantActorRef(IActorRefFactory actorSystem, IActorRef actorRef,
TimeSpan refreshInterval = default(TimeSpan), TimeSpan requestTimeout = default(TimeSpan))
: this(actorSystem, actorRef.Path.ToString(), refreshInterval, requestTimeout)
{
_actorRef = actorRef;
_valid = true;
}
public FaultTolerantActorRef(IActorRefFactory actorSystem, string actorPath,
TimeSpan refreshInterval = default(TimeSpan), TimeSpan requestTimeout = default(TimeSpan))
{
if (refreshInterval == default(TimeSpan))
_refreshInterval = TimeSpan.FromSeconds(60);
else
_refreshInterval = refreshInterval;
if (requestTimeout == default(TimeSpan))
_requestTimeout = TimeSpan.FromSeconds(60);
else
_requestTimeout = requestTimeout;
_actorSystem = actorSystem;
_valid = false;
_path = actorPath;
}
private void RefreshActorRef()
{
lock(_lock)
{
if (!_valid || DateTime.Now.Ticks > _nextRefreshTime)
{
_actorRef = _actorSystem.ActorSelectionOne(_path, _requestTimeout);
Interlocked.Exchange(ref _nextRefreshTime,DateTime.Now.Ticks + _refreshInterval.Ticks);
_valid = true;
}
}
}
public void Invalidate()
{
_valid = false;
}
public void Tell(object message, IActorRef sender)
{
ActorRef.Tell(message, sender);
}
public bool Equals(IActorRef other)
{
return ActorRef.Equals(other);
}
public int CompareTo(IActorRef other)
{
return ActorRef.CompareTo(other);
}
public ISurrogate ToSurrogate(ActorSystem system)
{
return ActorRef.ToSurrogate(system);
}
public int CompareTo(object obj)
{
return ActorRef.CompareTo(obj);
}
}
}