0

在最近的调查和Stack over flow 问题之后,我意识到集群分片比集群一致哈希路由器更好。但我无法让 2 进程集群运行。

一个进程是种子,另一个是客户端。Seed 节点似乎不断地抛出死信消息(见本问题的结尾)。

这个种子 HOCON 如下:

akka {
loglevel = "INFO"                    

actor {
    provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
    serializers {
        wire = "Akka.Serialization.WireSerializer, Akka.Serialization.Wire"
    }
    serialization-bindings {
        "System.Object" = wire
    }
}                    

remote {
    dot-netty.tcp {
        hostname = "127.0.0.1"
        port = 5000
    }
}

persistence {
    journal {
        plugin = "akka.persistence.journal.sql-server"
        sql-server {
            class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
            schema-name = dbo
            auto-initialize = on
            connection-string = "Data Source=localhost;Integrated Security=True;MultipleActiveResultSets=True;Initial Catalog=ClusterExperiment01"
            plugin-dispatcher = "akka.actor.default- dispatcher"
            connection-timeout = 30s
            table-name = EventJournal
            timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
            metadata-table-name = Metadata
        }
    }

    sharding {
        connection-string = "Data Source=localhost;Integrated Security=True;MultipleActiveResultSets=True;Initial Catalog=ClusterExperiment01"
        auto-initialize = on
        plugin-dispatcher = "akka.actor.default-dispatcher"
        class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
        connection-timeout = 30s
        schema-name = dbo
        table-name = ShardingJournal
        timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
        metadata-table-name = ShardingMetadata
    }
}

snapshot-store {
    sharding {
        class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"
        plugin-dispatcher = "akka.actor.default-dispatcher"
        connection-string = "Data Source=localhost;Integrated Security=True;MultipleActiveResultSets=True;Initial Catalog=ClusterExperiment01"
        connection-timeout = 30s
        schema-name = dbo
        table-name = ShardingSnapshotStore
        auto-initialize = on
    }
}

cluster {
    seed-nodes = ["akka.tcp://my-cluster-system@127.0.0.1:5000"]
    roles = ["Seed"]

    sharding {
        journal-plugin-id = "akka.persistence.sharding"
        snapshot-plugin-id = "akka.snapshot-store.sharding"
    }
}}

我有一种方法可以将上述内容变成这样的配置:

var config = NodeConfig.Create(/* HOCON above */).WithFallback(ClusterSingletonManager.DefaultConfig());

如果没有“WithFallback”,我会从配置生成中得到一个空引用异常。

然后像这样生成系统:

var system = ActorSystem.Create("my-cluster-system", config);

客户端以相同的方式创建其系统,HOCON 几乎相同,除了:

{
remote {
    dot-netty.tcp {
        hostname = "127.0.0.1"
        port = 5001
    }
}
cluster {
    seed-nodes = ["akka.tcp://my-cluster-system@127.0.0.1:5000"]
    roles = ["Client"]
    role.["Seed"].min-nr-of-members = 1
    sharding {
        journal-plugin-id = "akka.persistence.sharding"
        snapshot-plugin-id = "akka.snapshot-store.sharding"
    }
}}

种子节点像这样创建分片:

ClusterSharding.Get(system).Start(
   typeName: "company-router",
   entityProps: Props.Create(() => new CompanyDeliveryActor()),                    
   settings: ClusterShardingSettings.Create(system),
   messageExtractor: new RouteExtractor(100)
);

客户端创建一个分片代理,如下所示:

ClusterSharding.Get(system).StartProxy(
    typeName: "company-router",
    role: "Seed",
    messageExtractor: new RouteExtractor(100));

RouteExtractor 是:

public class RouteExtractor : HashCodeMessageExtractor
{
    public RouteExtractor(int maxNumberOfShards) : base(maxNumberOfShards)
    {   
    }
    public override string EntityId(object message) => (message as IHasRouting)?.Company?.VolumeId.ToString();
    public override object EntityMessage(object message) => message;
}

在这种情况下,VolumeId 总是相同的(只是为了实验)。

这两个过程都开始了,但种子不断向日志抛出这个错误:

[INFO][7/05/2017 9:00:58 AM][Thread 0003][akka://my-cluster-system/user/sharding /company-routerCoordinator/singleton/coordinator] 来自 akka.tcp 的消息注册: //my-cluster-system@127.0.0.1:5000/user/sharding/company-router 到 akka://my-cl uster-system/user/sharding/company-routerCoordinator/singleton/coordinator 未交付。遇到4个死信。

附言。我没有使用灯塔。

4

2 回答 2

0

快速浏览一下,您将在客户端节点上启动集群分片代理,并告诉它分片节点是使用种子角色的节点。当您没有指定任何角色时,这与种子节点上的集群分片定义不匹配。

由于没有角色可以限制它,因此种子节点上的集群分片将把集群中的所有节点视为完全能够托管分片参与者 - 包括客户端节点,它没有实例化集群分片(非代理)。

这可能不是唯一的问题,但您可以在所有节点上托管集群分片,或者使用ClusterShardingSettings.Create(system).WithRole("seed")将分片限制为集群中的特定节点子集(具有种子角色)。

于 2017-05-07T10:34:05.190 回答
0

谢谢Horusiath,已经解决了:

return sharding.Start(
   typeName: "company-router",
   entityProps: Props.Create(() => new CompanyDeliveryActor()),                    
   settings: ClusterShardingSettings.Create(system).WithRole("Seed"),
                messageExtractor: new RouteExtractor(100)                
            );

集群分片现在正在两个进程之间进行通信。非常感谢那一点。

于 2017-05-08T07:11:27.740 回答