我有 3 个基于远程处理(非集群)的akka 节点(akka 2.4.8 演员系统)。当创建远程参与者并执行长时间任务(需要超过 30 分钟)时,我会从远程参与者系统(在远程机器上)收到错误:远程系统已隔离此系统:
来自本地系统: 2016-08-11 03:29:12.748UTC WARN [PLM-akka.actor.default-dispatcher-27] RemoteWatcher | akka.tcp://PLM@flowsvr02:46407/system/remote-watcher | 检测到无法访问:[akka.tcp://AS1@lxsvr01g:9500] 2016-08-11 03:29:12.787UTC WARN [PLM-akka.actor.default-dispatcher-14] 远程处理 | akka.remote.Remoting | 与具有 UID [1284261532] 的 [akka.tcp://AS1@lxsvr01g:9500] 的关联不可恢复地失败。UID 现在已被隔离,所有发往此 UID 的邮件都将被发送到死信。必须重新启动远程 actorsystem 才能从这种情况中恢复。
从远程系统: 00:41:05.169UTC 警告 [AS2-akka.actor.default-dispatcher-5] eliableDeliverySupervisor | eEndpointWriter-akka.tcp%3A%2F%2FPLM%40flowsvr02%3A36210-0 | 与远程系统 [akka.tcp://PLM@flowsvr02:36210] 的关联失败,地址现在被门控 [5000] 毫秒。原因:[解除关联] 01:06:23.138UTC 错误[AS2-akka.actor.default-dispatcher-17] EndpointWriter | /endpointWriter-akka.tcp%3A%2F%2FPLM%40ftflowsvr02%3A36210-1 | AssociationError [akka.tcp://AS2@lxsvr02g:9500] <- [akka.tcp://PLM@flowsvr02:36210]:错误 [无效地址:akka.tcp://PLM@flowsvr02:36210] [akka.tcp://PLM@flowsvr02:36210] remote.InvalidAssociation:无效地址:akka.tcp://PLM@flowsvr02:36210 原因:akka.remote.transport.Transport$InvalidAssociationException:远程系统已隔离此系统。在重新启动该系统之前,不可能与远程系统建立进一步的关联。]
本地代码:...
val remoteConfig = new RemotingConfig("application.conf")
val plmRmRepo = new ResourceManagerDBHandler(config.getString("database.txs_db"))
val remotingManager: ActorRef = system.actorOf(Props(new RemotingManager(plmRmRepo, remoteConfig, system)), name="RemotingManager")
...
val rmWorker: ActorRef = createRemoteActor(request, rm)
requestActor ! ResourceResponse(request.id, request.taskType, request.originalSender, Some(rmWorker))
log.info(s"remote actor is created: " + rmWorker.toString())
...
def createRemoteActor(request: ResourceRequest, rm: ResourceManagerClass): ActorRef = {
log.info(s"RemotingManager: @" + rm.nodeName + "to create remote actor..." + request.implementation)
val delegateClass = Class.forName(request.implementation)
val remoteASAddress = Address(rm.protocol, rm.nodeName, rm.host, rm.port)
system.actorOf(Props(delegateClass).
withDeploy(Deploy(scope = RemoteScope(remoteASAddress))))
远程端很简单,只需启动一个actor系统(当然,有所有的类实现,同一个jar)
object RemoteMain extends App {
//val config = ConfigFactory.load("remotesystem.conf")
val config = ConfigFactory.load()
var remoteSystemName = config.getString("RemoteSystem.nodeName")
//create an actor system with that config
val system = ActorSystem(remoteSystemName, config)
implicit val executor = system.dispatcher
//val defaultActor = system.actorOf(Props[RemoteActorSystem], remoteConfig.className)
system.log.info("## Remote Manager Is Started ##")
}
本地主演员系统和远程演员系统之间的心跳似乎在 30 分钟后不起作用(超时???)?我该如何解决这个问题?
谢谢,
=============
更多更新: 为 AKKA 启用调试后:
来自本地(主):2016-08-12 19:14:32.015UTC WARN [PLM-akka.actor.default-dispatcher-5] RemoteWatcher | akka.tcp://PLM@flowsvr02:9888/system/remote-watcher | 检测到无法访问:[akka.tcp://AS1@lxsvr01g:9500]
从远程故障节点(从):2016-08-12 19:17:31.707UTC WARN [AS1-akka.actor.default-dispatcher-21] eliableDeliverySupervisor | leEndpointWriter-akka.tcp%3A%2F%2FPLM%40flowsvr02%3A9888-0 | 与远程系统 [akka.tcp://PLM@flowsvr02:9888] 的关联失败,地址现在被门控 [5000] 毫秒。原因:[解除关联]
应用程序.conf:
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = DEBUG
#logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
#log-config-on-start = on
log-dead-letters = 10
log-dead-letters-during-shutdown = on
logger-startup-timeout = 30s
actor {
serializers {
akka-containers = "akka.remote.serialization.MessageContainerSerializer"
akka-misc = "akka.remote.serialization.MiscMessageSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
daemon-create = "akka.remote.serialization.DaemonMsgCreateSerializer"
}
serialization-bindings {
"akka.actor.ActorSelectionMessage" = akka-containers
# The classes akka.actor.Identify and akka.actor.ActorIdentity serialization/deserialization are required by
# the cluster client to work.
# For the purpose of preserving protocol backward compatibility, akka.actor.Identify and akka.actor.ActorIdentity
# are stil using java serialization by default.
# Should java serialization be disabled, uncomment the following lines
# "akka.actor.Identify" = akka-misc
# "akka.actor.ActorIdentity" = akka-misc
# Should java serialization be disabled, uncomment the following lines
# "scala.Some" = akka-misc
# "scala.None$" = akka-misc
"akka.remote.DaemonMsgCreate" = daemon-create
# Since akka.protobuf.Message does not extend Serializable but
# GeneratedMessage does, need to use the more specific one here in order
# to avoid ambiguity.
"akka.protobuf.GeneratedMessage" = proto
# Since com.google.protobuf.Message does not extend Serializable but
# GeneratedMessage does, need to use the more specific one here in order
# to avoid ambiguity.
# This com.google.protobuf serialization binding is only used if the class can be loaded,
# i.e. com.google.protobuf dependency has been added in the application project.
"com.google.protobuf.GeneratedMessage" = proto
}
serialization-identifiers {
"akka.remote.serialization.ProtobufSerializer" = 2
"akka.remote.serialization.DaemonMsgCreateSerializer" = 3
"akka.remote.serialization.MessageContainerSerializer" = 6
"akka.remote.serialization.MiscMessageSerializer" = 16
}
debug {
receive = on
# enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill et.c.)
autoreceive = on
# enable DEBUG logging of actor lifecycle changes
lifecycle = on
# enable DEBUG logging of unhandled messages
unhandled = on
}
warn-about-java-serializer-usage = false
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
# If this is "on", Akka will log all outbound messages at DEBUG level
log-sent-messages = on
# If this is "on", Akka will log all inbound messages at DEBUG level
log-received-messages = on
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "ftflowsvr02"
port = 9888
tcp-keepalive = on
}
transport-failure-detector {
implementation-class = "akka.remote.DeadlineFailureDetector"
heartbeat-interval = 5 s
acceptable-heartbeat-pause = 300 s
}
watch-failure-detector {
# FQCN of the failure detector implementation.
# It must implement akka.remote.FailureDetector and have
# a public constructor with a com.typesafe.config.Config and
# akka.actor.EventStream parameter.
implementation-class = "akka.remote.PhiAccrualFailureDetector"
# How often keep-alive heartbeat messages should be sent to each connection.
heartbeat-interval = 5 s
# Defines the failure detector threshold.
# A low threshold is prone to generate many wrong suspicions but ensures
# a quick detection in the event of a real crash. Conversely, a high
# threshold generates fewer mistakes but needs more time to detect
# actual crashes.
threshold = 300.0
# Number of the samples of inter-heartbeat arrival times to adaptively
# calculate the failure timeout for connections.
max-sample-size = 200
# Minimum standard deviation to use for the normal distribution in
# AccrualFailureDetector. Too low standard deviation might result in
# too much sensitivity for sudden, but normal, deviations in heartbeat
# inter arrival times.
min-std-deviation = 100 ms
# Number of potentially lost/delayed heartbeats that will be
# accepted before considering it to be an anomaly.
# This margin is important to be able to survive sudden, occasional,
# pauses in heartbeat arrivals, due to for example garbage collect or
# network drop.
acceptable-heartbeat-pause = 300 s
# How often to check for nodes marked as unreachable by the failure
# detector
unreachable-nodes-reaper-interval = 5s
# After the heartbeat request has been sent the first failure detection
# will start after this period, even though no heartbeat mesage has
# been received.
expected-response-after = 5 s
}
retry-gate-closed-for = 60 s
quarantine-after-silence = 5 d
resend-interval = 5 s
resend-limit = 200
default-remote-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
parallelism-max = 2
}
}
}
}