0

我和我的同事一直对 DistributedPubSubMediator 直接或通过代理 Actor 订阅/取消订阅的不同行为感到困惑。我们进行了测试以显示下面的不同结果。

根据我们的理解,ActorRef.forward 应该传入原始发送者,因此消息是直接发送到 Mediator 还是通过代理 Actor 无关紧要。IE。http://www.scala-lang.org/api/current/index.html#scala.actors.ActorRef

要解决此问题,我们必须扩展 DIstributedPubSubMediator 类并包含已提供的逻辑 DistributedPubSubMediator 对象。理想情况下,我们更愿意直接使用该对象并还原我们的代码。

这似乎是一个错误。有谁知道这种异常行为的根本原因?请帮忙...

[2013 年 10 月 22 日] 根据 Roland 的回答(谢谢)更新了测试,并在 SubscriberAck 和 UnsubscribeAck 上添加了 expectMsgType。我们现在收到了 SubscribeAck,但奇怪的是没有收到 UnSubscribeAck。这不是一个大问题,但我们想知道为什么。

另一个问题,如果我们可以一起问的话,通过在同一个 ActorSystem 中运行的代理 Actor 将远程 Actor 订阅到 DistributedPubSubMediator 是否是一种好习惯?

目前我们有:

  1. 订阅 App 发现发布 App(以非 Akka 方式)并获取集群地址。
  2. 远程订阅者使用此地址和已知代理参与者的路径来发送身份请求。
  3. 远程订阅者获取 ActorIdentity 响应,然后通过此(远程)代理订阅/取消订阅。
  4. 在发布者 App 上,订阅/取消订阅消息被转发到 DistributedPubSubMediator,用于发布后续业务消息。

我们没有按照 Akka Reactor pubsub 聊天客户端示例加入集群(即仅使用 DistributedPubSubMediator 发布),因为我们需要在 Publisher 端处理故障转移。

[2013 年 11 月 5 日] 添加了发送消息的测试。它似乎不起作用,我们还没有弄清楚。

package star.common.pubsub

import org.scalatest.{BeforeAndAfterAll, FunSuite}

import org.junit.runner.RunWith

import akka.contrib.pattern.DistributedPubSubExtension
import akka.contrib.pattern.DistributedPubSubMediator._
import akka.testkit.TestKit
import akka.actor.{Actor, ActorSystem, ActorRef, Props}
import scala.concurrent.duration._

import com.typesafe.config.ConfigFactory

object MediatorTest {
  val config = ConfigFactory.parseString(s"""
                              akka.actor.provider="akka.cluster.ClusterActorRefProvider"
                              akka.remote.netty.tcp.port=0
                              akka.extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]
                              """)
}

@RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MediatorTest extends TestKit(ActorSystem("test", MediatorTest.config)) with FunSuite {

  val mediator = DistributedPubSubExtension(system).mediator
  val topic = "example"
  val message = "Published Message"
  //  val joinAddress = Cluster(system).selfAddress
  //  Cluster(system).join(joinAddress)

  test("Direct subscribe to mediator") {
    mediator.!(Subscribe(topic, testActor))(testActor)
    expectMsgType[SubscribeAck](5 seconds)

    mediator.!(Publish(topic, message))(testActor)
    expectMsg(2 seconds, message)

    mediator.!(Unsubscribe(topic, testActor))(testActor)
    expectMsgType[UnsubscribeAck](5 seconds)

    mediator ! Publish(topic, message)
    expectNoMsg(2 seconds)
  }


  test("Subscribe to mediator via proxy") {
    class Proxy extends Actor {
      override def receive = {
        case subscribe: Subscribe =>
          mediator forward subscribe

        case unsubscribe: Unsubscribe =>
          mediator forward unsubscribe

        case publish: Publish =>
          mediator.!(publish)
      }
    } 

    val proxy = system.actorOf(Props(new Proxy), "proxy")

    proxy.!(Subscribe(topic,testActor))(testActor)
    expectMsgType[SubscribeAck](2 seconds)

    proxy ! Publish(topic, message)
    expectMsg(5 seconds, message)

    proxy.!(Unsubscribe(topic,testActor))(testActor)
    expectMsgType[UnsubscribeAck](5 seconds)

    proxy ! Publish(topic, message)
    expectNoMsg(5 seconds)
  }

  test("Send message to address") {

    val testActorAddress = testActor.path.toString
    //    val system2 = ActorSystem("test", MediatorTest.config)
    //    Cluster(system2).join(joinAddress)

    mediator.!(Subscribe(topic, testActor))(testActor)
    expectMsgType[SubscribeAck](5 seconds)

    println(testActorAddress) // akka://test/system/testActor1

    mediator.!(Publish(topic, message))(testActor)
    expectMsg(2 seconds, message)

    mediator ! Send(testActorAddress, message, false)

    expectMsg(5 seconds, message)
  }
}
4

1 回答 1

1

两件事情:

  • 您是否使用forward并不重要,因为您在测试过程中没有有用的发件人(您没有混入ImplicitSender);但这不是问题
  • 您没有转发Publish消息,这就是它不发布消息的原因
于 2013-10-22T06:35:49.430 回答