从简单的角度来看,没有什么可以禁止您在没有额外参与者的情况下将远程参与者订阅到您的事件流。Akka 文档提到:
事件流是一个本地设施,这意味着它不会将事件分发到集群环境中的其他节点(除非您明确订阅远程 Actor 到流)。如果您需要在 Akka 集群中广播事件,而无需明确知道您的接收者(即获取他们的 ActorRefs),您可能需要查看:集群中的分布式发布订阅。
出于说明目的,请考虑以下与您要订阅的远程系统相对应的代码片段:
class PublisherActor extends Actor with ActorLogging { // example publisher actor just to generate some logs
context.system.scheduler.schedule(1.second, 1.second, self, "echo")
def receive = {
case "echo" ⇒
val x = Random.nextInt(100)
log.info(s"I got a random number: $x")
}
}
def runPublisher() = {
println("=== running publisher node ===")
val system = ActorSystem("PublisherSystem")
val selection = system.actorSelection("akka.tcp://SubscriberSystem@127.0.0.1:2553/user/subscriber")
selection.resolveOne(10.seconds) onSuccess { // when the listener actor is available,
case listener ⇒ system.eventStream.subscribe(listener, classOf[LogEvent]) // subscribe it to the event stream
}
val publisher = system.actorOf(Props[PublisherActor], "publisher") // some example publisher
}
然后是“本地”节点中的相应订阅者,您要从中显示日志:
class SubscriberActor extends Actor with ActorLogging {
log.info("subscriber listening...")
def receive = {
case msg ⇒ log.info(s"Got: $msg")
}
}
def runSubscriber() = {
println("=== running subscriber node ===")
val system = ActorSystem("SubscriberSystem")
val listener = system.actorOf(Props[SubscriberActor], "subscriber")
}
然而,这个解决方案有几个注意事项,因为发布者必须在订阅者之前运行(或者订阅者在发布者启动之前实施一些重试策略),位置是硬编码的等等。如果你想拥有一个更健壮和有弹性的系统并且它是允许的,请遵循文档中的建议并在集群环境中使用分布式发布者 - 订阅者,这具有类似数量的样板文件的几个优点。
希望它有所帮助!