1

我有 3 个基于远程处理(非集群)的akka 节点(akka 2.4.8 演员系统)。当创建远程参与者并执行长时间任务(需要超过 30 分钟)时,我会从远程参与者系统(在远程机器上)收到错误:远程系统已隔离此系统:

来自本地系统: 2016-08-11 03:29:12.748UTC WARN [PLM-akka.actor.default-dispatcher-27] RemoteWatcher | akka.tcp://PLM@flowsvr02:46407/system/remote-watcher | 检测到无法访问:[akka.tcp://AS1@lxsvr01g:9500] 2016-08-11 03:29:12.787UTC WARN [PLM-akka.actor.default-dispatcher-14] 远程处理 | akka.remote.Remoting | 与具有 UID [1284261532] 的 [akka.tcp://AS1@lxsvr01g:9500] 的关联不可恢复地失败。UID 现在已被隔离,所有发往此 UID 的邮件都将被发送到死信。必须重新启动远程 actorsystem 才能从这种情况中恢复。

从远程系统: 00:41:05.169UTC 警告 [AS2-akka.actor.default-dispatcher-5] eliableDeliverySupervisor | eEndpointWriter-akka.tcp%3A%2F%2FPLM%40flowsvr02%3A36210-0 | 与远程系统 [akka.tcp://PLM@flowsvr02:36210] 的关联失败,地址现在被门控 [5000] 毫秒。原因:[解除关联] 01:06:23.138UTC 错误[AS2-akka.actor.default-dispatcher-17] EndpointWriter | /endpointWriter-akka.tcp%3A%2F%2FPLM%40ftflowsvr02%3A36210-1 | AssociationError [akka.tcp://AS2@lxsvr02g:9500] <- [akka.tcp://PLM@flowsvr02:36210]:错误 [无效地址:akka.tcp://PLM@flowsvr02:36210] [akka.tcp://PLM@flowsvr02:36210] remote.InvalidAssociation:无效地址:akka.tcp://PLM@flowsvr02:36210 原因:akka.remote.transport.Transport$InvalidAssociationException:远程系统已隔离此系统。在重新启动该系统之前,不可能与远程系统建立进一步的关联。]

本地代码:...

      val remoteConfig = new RemotingConfig("application.conf")
      val plmRmRepo = new ResourceManagerDBHandler(config.getString("database.txs_db"))
      val remotingManager: ActorRef = system.actorOf(Props(new RemotingManager(plmRmRepo, remoteConfig, system)), name="RemotingManager")

...

      val rmWorker: ActorRef = createRemoteActor(request, rm)
      requestActor ! ResourceResponse(request.id, request.taskType, request.originalSender, Some(rmWorker))
      log.info(s"remote actor is created: " + rmWorker.toString())

...

      def createRemoteActor(request: ResourceRequest, rm: ResourceManagerClass): ActorRef = {
          log.info(s"RemotingManager: @" + rm.nodeName + "to create remote actor..." + request.implementation)
          val delegateClass = Class.forName(request.implementation)
          val remoteASAddress = Address(rm.protocol, rm.nodeName, rm.host, rm.port)
          system.actorOf(Props(delegateClass).
               withDeploy(Deploy(scope = RemoteScope(remoteASAddress))))

远程端很简单,只需启动一个actor系统(当然,有所有的类实现,同一个jar)

      object RemoteMain extends App {
      //val config = ConfigFactory.load("remotesystem.conf")
      val config = ConfigFactory.load()
      var remoteSystemName = config.getString("RemoteSystem.nodeName")

      //create an actor system with that config
      val system = ActorSystem(remoteSystemName, config)
      implicit val executor = system.dispatcher

      //val defaultActor = system.actorOf(Props[RemoteActorSystem], remoteConfig.className)
      system.log.info("## Remote Manager Is Started ##")
    }

本地主演员系统和远程演员系统之间的心跳似乎在 30 分钟后不起作用(超时???)?我该如何解决这个问题?

谢谢,

=============

更多更新: 为 AKKA 启用调试后:

来自本地(主):2016-08-12 19:14:32.015UTC WARN [PLM-akka.actor.default-dispatcher-5] RemoteWatcher | akka.tcp://PLM@flowsvr02:9888/system/remote-watcher | 检测到无法访问:[akka.tcp://AS1@lxsvr01g:9500]

从远程故障节点(从):2016-08-12 19:17:31.707UTC WARN [AS1-akka.actor.default-dispatcher-21] eliableDeliverySupervisor | leEndpointWriter-akka.tcp%3A%2F%2FPLM%40flowsvr02%3A9888-0 | 与远程系统 [akka.tcp://PLM@flowsvr02:9888] 的关联失败,地址现在被门控 [5000] 毫秒。原因:[解除关联]

应用程序.conf:

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = DEBUG
  #logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
  #log-config-on-start = on
  log-dead-letters = 10
  log-dead-letters-during-shutdown = on
  logger-startup-timeout = 30s
  actor {
     serializers {
      akka-containers = "akka.remote.serialization.MessageContainerSerializer"
      akka-misc = "akka.remote.serialization.MiscMessageSerializer"
      proto = "akka.remote.serialization.ProtobufSerializer"
      daemon-create = "akka.remote.serialization.DaemonMsgCreateSerializer"
    }

    serialization-bindings {
      "akka.actor.ActorSelectionMessage" = akka-containers
      # The classes akka.actor.Identify and akka.actor.ActorIdentity serialization/deserialization are required by
      # the cluster client to work.
      # For the purpose of preserving protocol backward compatibility, akka.actor.Identify and akka.actor.ActorIdentity
      # are stil using java serialization by default.
      # Should java serialization be disabled, uncomment the following lines
      # "akka.actor.Identify" = akka-misc
      # "akka.actor.ActorIdentity" = akka-misc
      # Should java serialization be disabled, uncomment the following lines
      # "scala.Some" = akka-misc
      # "scala.None$" = akka-misc
      "akka.remote.DaemonMsgCreate" = daemon-create

      # Since akka.protobuf.Message does not extend Serializable but
      # GeneratedMessage does, need to use the more specific one here in order
      # to avoid ambiguity.
      "akka.protobuf.GeneratedMessage" = proto

      # Since com.google.protobuf.Message does not extend Serializable but
      # GeneratedMessage does, need to use the more specific one here in order
      # to avoid ambiguity.
      # This com.google.protobuf serialization binding is only used if the class can be loaded,
      # i.e. com.google.protobuf dependency has been added in the application project.
      "com.google.protobuf.GeneratedMessage" = proto

    }

    serialization-identifiers {
      "akka.remote.serialization.ProtobufSerializer" = 2
      "akka.remote.serialization.DaemonMsgCreateSerializer" = 3
      "akka.remote.serialization.MessageContainerSerializer" = 6
      "akka.remote.serialization.MiscMessageSerializer" = 16
    }

    debug {
      receive = on
      # enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill et.c.)
      autoreceive = on
      # enable DEBUG logging of actor lifecycle changes
      lifecycle = on
      # enable DEBUG logging of unhandled messages
      unhandled = on
    }

    warn-about-java-serializer-usage = false

    provider = "akka.remote.RemoteActorRefProvider"
  }

  remote {
    # If this is "on", Akka will log all outbound messages at DEBUG level
    log-sent-messages = on
    # If this is "on", Akka will log all inbound messages at DEBUG level
    log-received-messages = on

    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "ftflowsvr02"
      port = 9888
      tcp-keepalive = on      
    }
    transport-failure-detector {
      implementation-class = "akka.remote.DeadlineFailureDetector"
      heartbeat-interval = 5 s
      acceptable-heartbeat-pause = 300 s      
    }   

    watch-failure-detector {
      # FQCN of the failure detector implementation.
      # It must implement akka.remote.FailureDetector and have
      # a public constructor with a com.typesafe.config.Config and
      # akka.actor.EventStream parameter.
      implementation-class = "akka.remote.PhiAccrualFailureDetector"

      # How often keep-alive heartbeat messages should be sent to each connection.
      heartbeat-interval = 5 s

      # Defines the failure detector threshold.
      # A low threshold is prone to generate many wrong suspicions but ensures
      # a quick detection in the event of a real crash. Conversely, a high
      # threshold generates fewer mistakes but needs more time to detect
      # actual crashes.
      threshold = 300.0

      # Number of the samples of inter-heartbeat arrival times to adaptively
      # calculate the failure timeout for connections.
      max-sample-size = 200

      # Minimum standard deviation to use for the normal distribution in
      # AccrualFailureDetector. Too low standard deviation might result in
      # too much sensitivity for sudden, but normal, deviations in heartbeat
      # inter arrival times.
      min-std-deviation = 100 ms

      # Number of potentially lost/delayed heartbeats that will be
      # accepted before considering it to be an anomaly.
      # This margin is important to be able to survive sudden, occasional,
      # pauses in heartbeat arrivals, due to for example garbage collect or
      # network drop.
      acceptable-heartbeat-pause = 300 s


      # How often to check for nodes marked as unreachable by the failure
      # detector
      unreachable-nodes-reaper-interval = 5s

      # After the heartbeat request has been sent the first failure detection
      # will start after this period, even though no heartbeat mesage has
      # been received.
      expected-response-after = 5 s

    }

    retry-gate-closed-for = 60 s

    quarantine-after-silence = 5 d

    resend-interval = 5 s   

    resend-limit = 200  

    default-remote-dispatcher {
      type = Dispatcher
      executor = "fork-join-executor"
      fork-join-executor {
        # Min number of threads to cap factor-based parallelism number to
        parallelism-min = 2
        parallelism-max = 2
      }
    }   

  }
}
4

0 回答 0