我和我的同事一直对 DistributedPubSubMediator 直接或通过代理 Actor 订阅/取消订阅的不同行为感到困惑。我们进行了测试以显示下面的不同结果。
根据我们的理解,ActorRef.forward 应该传入原始发送者,因此消息是直接发送到 Mediator 还是通过代理 Actor 无关紧要。IE。http://www.scala-lang.org/api/current/index.html#scala.actors.ActorRef。
要解决此问题,我们必须扩展 DIstributedPubSubMediator 类并包含已提供的逻辑 DistributedPubSubMediator 对象。理想情况下,我们更愿意直接使用该对象并还原我们的代码。
这似乎是一个错误。有谁知道这种异常行为的根本原因?请帮忙...
[2013 年 10 月 22 日] 根据 Roland 的回答(谢谢)更新了测试,并在 SubscriberAck 和 UnsubscribeAck 上添加了 expectMsgType。我们现在收到了 SubscribeAck,但奇怪的是没有收到 UnSubscribeAck。这不是一个大问题,但我们想知道为什么。
另一个问题,如果我们可以一起问的话,通过在同一个 ActorSystem 中运行的代理 Actor 将远程 Actor 订阅到 DistributedPubSubMediator 是否是一种好习惯?
目前我们有:
- 订阅 App 发现发布 App(以非 Akka 方式)并获取集群地址。
- 远程订阅者使用此地址和已知代理参与者的路径来发送身份请求。
- 远程订阅者获取 ActorIdentity 响应,然后通过此(远程)代理订阅/取消订阅。
- 在发布者 App 上,订阅/取消订阅消息被转发到 DistributedPubSubMediator,用于发布后续业务消息。
我们没有按照 Akka Reactor pubsub 聊天客户端示例加入集群(即仅使用 DistributedPubSubMediator 发布),因为我们需要在 Publisher 端处理故障转移。
[2013 年 11 月 5 日] 添加了发送消息的测试。它似乎不起作用,我们还没有弄清楚。
package star.common.pubsub
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.junit.runner.RunWith
import akka.contrib.pattern.DistributedPubSubExtension
import akka.contrib.pattern.DistributedPubSubMediator._
import akka.testkit.TestKit
import akka.actor.{Actor, ActorSystem, ActorRef, Props}
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
object MediatorTest {
val config = ConfigFactory.parseString(s"""
akka.actor.provider="akka.cluster.ClusterActorRefProvider"
akka.remote.netty.tcp.port=0
akka.extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]
""")
}
@RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MediatorTest extends TestKit(ActorSystem("test", MediatorTest.config)) with FunSuite {
val mediator = DistributedPubSubExtension(system).mediator
val topic = "example"
val message = "Published Message"
// val joinAddress = Cluster(system).selfAddress
// Cluster(system).join(joinAddress)
test("Direct subscribe to mediator") {
mediator.!(Subscribe(topic, testActor))(testActor)
expectMsgType[SubscribeAck](5 seconds)
mediator.!(Publish(topic, message))(testActor)
expectMsg(2 seconds, message)
mediator.!(Unsubscribe(topic, testActor))(testActor)
expectMsgType[UnsubscribeAck](5 seconds)
mediator ! Publish(topic, message)
expectNoMsg(2 seconds)
}
test("Subscribe to mediator via proxy") {
class Proxy extends Actor {
override def receive = {
case subscribe: Subscribe =>
mediator forward subscribe
case unsubscribe: Unsubscribe =>
mediator forward unsubscribe
case publish: Publish =>
mediator.!(publish)
}
}
val proxy = system.actorOf(Props(new Proxy), "proxy")
proxy.!(Subscribe(topic,testActor))(testActor)
expectMsgType[SubscribeAck](2 seconds)
proxy ! Publish(topic, message)
expectMsg(5 seconds, message)
proxy.!(Unsubscribe(topic,testActor))(testActor)
expectMsgType[UnsubscribeAck](5 seconds)
proxy ! Publish(topic, message)
expectNoMsg(5 seconds)
}
test("Send message to address") {
val testActorAddress = testActor.path.toString
// val system2 = ActorSystem("test", MediatorTest.config)
// Cluster(system2).join(joinAddress)
mediator.!(Subscribe(topic, testActor))(testActor)
expectMsgType[SubscribeAck](5 seconds)
println(testActorAddress) // akka://test/system/testActor1
mediator.!(Publish(topic, message))(testActor)
expectMsg(2 seconds, message)
mediator ! Send(testActorAddress, message, false)
expectMsg(5 seconds, message)
}
}