5

是我愚蠢 - 我没有将索引器道具传递给系统创建。我会在这里留下答案,以防有人从中受益*

我正在创建一个单例并发送这样的消息:

 val indexerProps = ClusterSingletonManager.props(had => Props(
            classOf[SingleCoreIndexer], dataProvider, publisher, name), name, End, None)

        val coreIndexer = system.actorOf(indexerProps, name)
        //val coreIndexer = system.actorOf(Props(classOf[SingleCoreIndexer], dataProvider, publisher, name))

        coreIndexer ! "start_indexing"

注释掉的行显示了工作正常的非单例道具

当我运行该应用程序时,我收到以下错误:

[WARN] [06/21/2013 11:55:32.443] [deadcoreindexerstest-akka.actor.default-dispatcher-5] [akka://deadcoreindexerstest/user/node1] unhandled event start_indexing in state Start

所有其他功能都停止工作,这与暗示“coreIndexer”演员没有收到“start_indexing”消息的消息相关

更多代码:

class Indexer(systemCreator: SystemCreator, publisherProps: Props, dataProviderProps: Props, name: String) {

    def start {
        val system = systemCreator.create
        val dataProvider = system.actorOf(dataProviderProps)
        val publisher = system.actorOf(publisherProps)

        val indexerProps = ClusterSingletonManager.props(
            singletonProps = had => Props(classOf[SingleCoreIndexer], dataProvider, publisher, name),
            singletonName = "aaa",
            terminationMessage = End,
            role = None
            )

        val coreIndexer = system.actorOf(Props(classOf[SingleCoreIndexer], dataProvider, publisher, name))
        coreIndexer ! "start_indexing"
    }
}



class SingleCoreIndexer(dataProvider: ActorRef, publisher: ActorRef, name: String) extends Actor {

    def receive = {

        case "start_indexing" => {
            println("Single core indexer starting indexing")
            dataProvider ! new NextBatchOfDataPlease
        }

        case BatchOfData(data) => {
            publisher ! (name, data)
            self ! "next_batch"
        }

        case "next_batch" => {
            dataProvider ! new NextBatchOfDataPlease
        }
    }
}

看起来我正在向经理而不是单身人士发送消息。但是,当我向单例发送消息时,什么也没有发生:

class Indexer(systemCreator: SystemCreator, publisherProps: Props, dataProviderProps: Props, name: String) {

    def start {
        val system = systemCreator.create
        val dataProvider = system.actorOf(dataProviderProps)
        val publisher = system.actorOf(publisherProps)

        val indexerProps = ClusterSingletonManager.props(
            singletonProps = had => Props(classOf[SingleCoreIndexer], dataProvider, publisher, name),
            singletonName = "singlecoreindexer",
            terminationMessage = End,
            role = None
            )

        system.actorOf(Props(classOf[SingleCoreIndexer], dataProvider, publisher, name))
        val coreIndexer = system.actorSelection(s"/user/$name/singlecoreindexer")
        coreIndexer ! "start_indexing"
    }
}
4

2 回答 2

4

我知道这已经解决了,但是即使有了这里提供的信息,我仍然花了一段时间才弄清楚如何向单身人士发送消息,并认为我会留下我在这里找到的

集群单例文档没有很好解释的两个关键概念是:

  1. 创建的actorClusterSingletonManager.props是实际实例的父级,并且
  2. 你应该只使用那个演员的地址来创建一个ClusterSingletonProxy

集群中的每个节点都将创建单例管理器,最终胜出的最老的节点是您实际想要与之交谈的单例的父节点。ClusterSingletonProxy确保您正在与实际的代理交谈,并且即使单例暂时不可用或迁移到另一个节点,您也始终在与正确的实例交谈。

鉴于该信息,代码应为:

class Indexer(systemCreator: SystemCreator, publisherProps: Props, dataProviderProps: Props, name: String) {

  def start {
    val system = systemCreator.create
    val dataProvider = system.actorOf(dataProviderProps)
    val publisher = system.actorOf(publisherProps)

    val indexerProps = ClusterSingletonManager.props(
      singletonProps = had => Props(classOf[SingleCoreIndexer], dataProvider, publisher, name),
      singletonName = "singlecoreindexer",
      terminationMessage = End,
      role = None
    )

    val singletonManager = system.actorOf(
      Props(classOf[SingleCoreIndexer],dataProvider, publisher, name)
    )

    val indexerPath = (singletonManager.path / name)
    val coreIndexer = system.actorOf(
      ClusterSingletonProxy.props(indexerPath, None),
      s"$name-proxy"
    )

    coreIndexer ! "start_indexing"
  }
}
于 2015-07-08T00:15:33.543 回答
2

您看到的问题(我认为)源于您向ClusterSingletonManager坐在它下面的实际演员而不是向您发送消息。如果按名称(),请尝试在下面查找演员actorFor,它应该可以工作。

于 2013-06-21T12:55:39.743 回答