2

我正在使用一个主角色和多个工作角色设置 akka 集群(使用 akka 库版本 2.3.9 )。Master Actor 配置有池集群感知路由器。

部署应用程序,形成 4 个集群节点。最初所有节点都正确加入集群,但几分钟后节点将与集群解除关联并形成自己的集群。

**Getting below warn message logged when enabling remote debug logging -**

**ERROR] []** [ClusterSystem-akka.remote.default-remote-dispatcher-23] [akka.tcp://ClusterSystem
@localhost:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40localhost%3A2552-0/en
dpointWriter] AssociationError [akka.tcp://ClusterSystem@localhost:2551] -> [akka.tcp://ClusterSystem@localhost:2552
]: **Error [Failed to write message to the transport] [
akka.remote.EndpointException: Failed to write message to the transport
Caused by: java.lang.IllegalArgumentException: Can't serialize object of type class org.springframework.context.supp
ort.ClassPathXmlApplicationContext**
    at akka.cluster.protobuf.ClusterMessageSerializer.toBinary(ClusterMessageSerializer.scala:74)
    at akka.serialization.Serialization$$anonfun$serialize$1.apply(Serialization.scala:90)
    at akka.serialization.Serialization$$anonfun$serialize$1.apply(Serialization.scala:90)
    at scala.util.Try$.apply(Try.scala:161)
    at akka.serialization.Serialization.serialize(Serialization.scala:90)
    at akka.remote.serialization.DaemonMsgCreateSerializer.serialize(DaemonMsgCreateSerializer.scala:107)
    at akka.remote.serialization.DaemonMsgCreateSerializer$$anonfun$propsProto$1$1.apply(DaemonMsgCreateSerializ
er.scala:56)
    at akka.remote.serialization.DaemonMsgCreateSerializer$$anonfun$propsProto$1$1.apply(DaemonMsgCreateSerializ
er.scala:56)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at akka.remote.serialization.DaemonMsgCreateSerializer.propsProto$1(DaemonMsgCreateSerializer.scala:56)
    at akka.remote.serialization.DaemonMsgCreateSerializer.toBinary(DaemonMsgCreateSerializer.scala:62)
    at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
    at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:842)
    at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:842)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:841)
    at akka.remote.EndpointWriter.writeSend(Endpoint.scala:742)
    at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:717)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:410)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
4

1 回答 1

1

我在我的 java-spring-akka 应用程序中遇到了同样的问题。我使用 IndirectActorProducer 将演员实例化为 bean(使用范围原型)。这个生产者包含 Spring ApplicationContext。这种方式非常适合集群分片。但是集群环境中的简单池路由器(在我的例子中是循环池)尝试通过集群发送带有 IndirectActorProducer(和 ApplicationContext)的 Props。我可以建议一些对我有用的 hack 解决方法:创建自定义序列化程序并将其定义为“ConfigurableApplicationContext”类。请参见下面的示例:

public class SpringContextSerializer extends JSerializer {
private static ConfigurableApplicationContext configurableApplicationContext;

public static void init(ConfigurableApplicationContext configurableApplicationContext) {
    SpringContextSerializer.configurableApplicationContext = configurableApplicationContext;
}

public SpringContextSerializer() {
    if (configurableApplicationContext == null) {
        throw new RuntimeException("Serializer mist be initialized before creating.");
    }
}

@Override
public int identifier() {
    return getClass().toString().hashCode();
}

@Override
public byte[] toBinary(Object o) {
    return new byte[0];
}

@Override
public boolean includeManifest() {
    return false;
}

@Override
public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
    return configurableApplicationContext;
}
}

配置:

akka.actor {
serializers {
  context = "ru.ddg.elleps.hubcache.integration.SpringContextSerializer"
}

serialization-bindings {
  "org.springframework.context.annotation.AnnotationConfigApplicationContext" = context
}
}

此外,您必须在启动 ActorSystem 之前使用容器初始化序列化程序。

于 2016-09-15T17:23:58.853 回答