12

Akka Cluster-Sharding 看起来与我必须跨 Akka 节点创建有状态持久性参与者的单个实例的用例非常匹配。

我不清楚是否有可能有一个需要参数来构造它的 Entry 演员类型。或者,也许我需要重新考虑 Entry 演员如何获取这些信息。

Object Account {
  def apply(region: String, accountId: String): Props = Props(new Account(region, accountId))
}

class Account(val region: String, val accountId: String) extends Actor with PersistentActor { ... }

ClusterSharding.start需要一个 Props 实例来创建所有 Entry Actor。

来自akka 集群分片

val counterRegion: ActorRef = ClusterSharding(system).start(
  typeName = "Counter",
  entryProps = Some(Props[Counter]),
  idExtractor = idExtractor,
  shardResolver = shardResolver)

然后它会根据您定义 idExtractor 的方式解析接收消息的 Entry Actor。从 shard 的源代码可以看出,它使用 id 作为给定 Entry actor 实例的名称:

def getEntry(id: EntryId): ActorRef = {
val name = URLEncoder.encode(id, "utf-8")
context.child(name).getOrElse {
  log.debug("Starting entry [{}] in shard [{}]", id, shardId)

  val a = context.watch(context.actorOf(entryProps, name))
  idByRef = idByRef.updated(a, id)
  refById = refById.updated(id, a)
  state = state.copy(state.entries + id)
  a
}

}

看来我应该让我的 Entry 演员通过给定的名称来计算它的区域和 accountId,尽管现在我将从字符串中解析它而不是直接获取值,这确实感觉有点 hacky。这是我最好的选择吗?

4

1 回答 1

12

我和你的情况非常相似。我没有确切的答案,但我可以与您和读者分享我所做/尝试/想到的事情。

选项 1) 正如您所提到的,您可以从命名内容和解析路径的方式中提取 id、shard 和 region 信息。好处是a)它很容易做到。缺点是 a) Akka 将 actor 路径编码为 UTF-8,因此如果您使用任何不是标准 url 字符的分隔符(例如 || 或 w/e),您需要先从 utf8 解码它. 请注意,Akka utf8 内部被硬编码为编码方法,无法像函数一样提取编码格式,因此如果明天 akka 发生变化,您也必须调整您的代码。b)您的系统不再保留同态(您的意思是“感觉有点老套”)。这意味着您正在增加风险,即有一天您的数据可能包含您的信息分隔符字符串作为有意义的数据,并且您的系统可能会出现混乱。

选项 2) 如果 Actor 不存在,Sharding 将生成它。因此,您可以强制您的代码始终向未初始化的参与者发送初始化消息,其中包含您的构造函数参数。你的分片演员里面会有这样的东西:

val par1: Option[param1Type] = None

def receive = {
    case init(par1value) => par1 = Some(par1value)
    case query(par1) => sender ! par1
}

并且从您的区域访问参与者中,您始终可以先发送查询消息,然后如果返回为无,则发送初始化消息。这假设您的区域访问参与者不包含已初始化参与者的列表,在这种情况下,您可以使用 init 生成然后正常使用它们。好处是a)它很优雅b)它“感觉”正确

缺点:a)它需要 2x 消息(如果您不维护初始化演员的列表)

选项 3)此选项已经过测试并且不起作用。我会把它留在这里,让人们避免浪费时间尝试同样的事情。 我不知道这是否有效,我还没有测试过,因为我在生产中使用这个场景有特殊的限制,不允许花哨的东西^_^但是请随时尝试,请通过 pm 或评论告诉我!基本上,您从您的区域开始

val counterRegion: ActorRef = ClusterSharding(system).start(
  typeName = "Counter",
  entryProps = Some(Props[Counter]),
  idExtractor = idExtractor,
  shardResolver = shardResolver)

如果您在您的区域创建参与者中执行以下操作怎么办:

var providedPar1 = v1
def providePar1 = providedPar1

val counterRegion: ActorRef = ClusterSharding(system).start(
  typeName = "Counter",
  entryProps = Some(Props(classOf[Counter], providePar1),
  idExtractor = idExtractor,
  shardResolver = shardResolver)

然后你为每次创建更改providedPar1 的值?这样做的缺点是,在它起作用的选项中,您需要避免更改providedPar1 的值,直到您100%确定已创建actor,否则您可能会冒险访问新的错误值(耶,比赛条件!)

一般来说,选项 2 imho 会更好,但在大多数情况下,选项 1 引入的风险很小,鉴于简单(和性能)优势,您可以适当地减轻它们。

希望这个咆哮有帮助,如果您尝试 3 次,请告诉我它是如何工作的!

于 2014-10-23T09:03:08.510 回答