1

我正在尝试跨不同的集群系统进行分布式发布-订阅,但无论我尝试什么,它都不起作用。

我要做的就是创建一个简单的例子。

1)我创建一个主题,说“内容”。

2)说jvm A中的一个节点创建主题,订阅它,以及发布到它的发布者。

3)在不同的节点上,比如在不同的端口上的 jvm B,我创建了一个订阅者。

4)当我从 jvm A 向主题发送消息时,我希望 jvm B 上的订阅者也接收它,因为它订阅了同一主题。

任何帮助将不胜感激,或者是分布式 pub sub 的简单工作示例,其中订阅者和发布者在不同的集群系统中的不同端口上,在 Java 中。

这是 app1 及其配置文件的代码。

 public class App1{

    public static void main(String[] args) {

    System.setProperty("akka.remote.netty.tcp.port", "2551");
    ActorSystem clusterSystem = ActorSystem.create("ClusterSystem");
    ClusterClientReceptionist clusterClientReceptionist1 = ClusterClientReceptionist.get(clusterSystem);
    ActorRef subcriber1=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber1");
    clusterClientReceptionist1.registerSubscriber("content", subcriber1);
    ActorRef publisher1=clusterSystem.actorOf(Props.create(Publisher.class), "publisher1");
    clusterClientReceptionist1.registerSubscriber("content", publisher1);
    publisher1.tell("testMessage1", ActorRef.noSender());

    }
}

app1.conf

akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
  hostname = "127.0.0.1"
  port = 2551
  }
}
cluster {
seed-nodes = [
  "akka.tcp://ClusterSystem@127.0.0.1:2551"
]
auto-down-unreachable-after = 10s
}
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub",
"akka.contrib.pattern.ClusterReceptionistExtension"]
  akka.cluster.pub-sub {
name = distributedPubSubMediator
role = ""
routing-logic = random
gossip-interval = 1s
removed-time-to-live = 120s
max-delta-elements = 3000
use-dispatcher = ""
}

akka.cluster.client.receptionist {
name = receptionist
role = ""
number-of-contacts = 3
response-tunnel-receive-timeout = 30s
use-dispatcher = ""
heartbeat-interval = 2s
acceptable-heartbeat-pause = 13s
failure-detection-interval = 2s
  }
}

app2 及其配置文件的代码

public class App
{
   public static Set<ActorPath> initialContacts() {
   return new HashSet<ActorPath>(Arrays.asList(          
   ActorPaths.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist")));
}

public static void main( String[] args ) {
    System.setProperty("akka.remote.netty.tcp.port", "2553");
    ActorSystem clusterSystem = ActorSystem.create("ClusterSystem2");
    ClusterClientReceptionist clusterClientReceptionist2 = ClusterClientReceptionist.get(clusterSystem);
    final ActorRef clusterClient = clusterSystem.actorOf(ClusterClient.props(ClusterClientSettings.create(
            clusterSystem).withInitialContacts(initialContacts())), "client"); 
    ActorRef subcriber2=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber2");
    clusterClientReceptionist2.registerSubscriber("content", subcriber2);
    ActorRef publisher2=clusterSystem.actorOf(Props.create(Publisher.class), "publisher2");
    publisher2.tell("testMessage2", ActorRef.noSender());
    clusterClient.tell(new ClusterClient.Send("/user/publisher1", "hello", true), null);

 }
}            

app2.conf

    akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
  hostname = "127.0.0.1"
  port = 2553
  }
}
cluster {
seed-nodes = [
  "akka.tcp://ClusterSystem@127.0.0.1:2553"
]
auto-down-unreachable-after = 10s
}
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub",
"akka.contrib.pattern.ClusterReceptionistExtension"]
  akka.cluster.pub-sub {
name = distributedPubSubMediator
role = ""
routing-logic = random
gossip-interval = 1s
removed-time-to-live = 120s
max-delta-elements = 3000
use-dispatcher = ""
}

akka.cluster.client.receptionist {
name = receptionist
role = ""
number-of-contacts = 3
response-tunnel-receive-timeout = 30s
use-dispatcher = ""
heartbeat-interval = 2s
acceptable-heartbeat-pause = 13s
failure-detection-interval = 2s
  }
}

下面给出的两个应用程序的发布者和订阅者类是相同的。

出版商:

 public class Publisher extends UntypedActor {
 private final ActorRef mediator =
        DistributedPubSub.get(getContext().system()).mediator();

 @Override
 public void onReceive(Object msg) throws Exception {
     if (msg instanceof String) {
         mediator.tell(new DistributedPubSubMediator.Publish("events", msg), getSelf());
    } else {
        unhandled(msg);
    }
 }

}

订户:

public class Subscriber extends UntypedActor {
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

public Subscriber(){

    ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
    mediator.tell(new DistributedPubSubMediator.Subscribe("events", getSelf()), getSelf());

}

public void onReceive(Object msg) throws Throwable {
    if (msg instanceof String) {
        log.info("Got: {}", msg);
    } else if (msg instanceof DistributedPubSubMediator.SubscribeAck) {
        log.info("subscribing");
    } else {
        unhandled(msg);
    }
}
}

运行这两个应用程序时,我在接收方应用程序中收到此错误。遇到死信

[ClusterSystem-akka.actor.default-dispatcher-21] INFO  akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://ClusterSystem/system/receptionist/akka.tcp%3A%2F%2FClusterSystem2%40127.0.0.1%3A2553%2FdeadLetters#188707926] to Actor[akka://ClusterSystem/system/distributedPubSubMediator#1119990682] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

并在发送方应用程序消息发送成功显示在日志中。

[ClusterSystem2-akka.actor.default-dispatcher-22] DEBUG akka.cluster.client.ClusterClient - Sending buffered messages to receptionist
4

2 回答 2

1

以这种方式使用 ClusterClient 并没有任何意义,并且与使用分布式 pub sub 没有任何关系,因为您的两个节点都是集群的一部分,您可以直接使用分布式 pub sub api。

这是一个简单的主要内容,包括使用您的确切发布者和订阅者参与者创建一个两节点集群的配置,该参与者按预期工作:

public static void main(String[] args) throws Exception {

  final Config config = ConfigFactory.parseString(
    "akka.actor.provider=cluster\n" +
    "akka.remote.netty.tcp.port=2551\n" +
    "akka.cluster.seed-nodes = [ \"akka.tcp://ClusterSystem@127.0.0.1:2551\"]\n");

  ActorSystem node1 = ActorSystem.create("ClusterSystem", config);
  ActorSystem node2 = ActorSystem.create("ClusterSystem",
    ConfigFactory.parseString("akka.remote.netty.tcp.port=2552")
      .withFallback(config));

  // wait a bit for the cluster to form
  Thread.sleep(3000);

  ActorRef subscriber = node1.actorOf(
    Props.create(Subscriber.class),
    "subscriber");

  ActorRef publisher = node2.actorOf(
    Props.create(Publisher.class), 
    "publisher");

  // wait a bit for the subscription to be gossiped
  Thread.sleep(3000);

  publisher.tell("testMessage1", ActorRef.noSender());
}

请注意,分布式 pub sub 不提供任何传递保证,因此如果您在调解员相互联系之前发送消息,则消息将简单地丢失(因此Thread.sleep声明,这不是您应该做的事情实际代码)。

于 2017-02-01T23:05:35.787 回答
0

我认为问题在于您的参与者系统具有不同的名称 ClusterSystem 和 ClusterSystem2。至少我遇到了同样的问题,因为我在集群中有两个不同的服务,但是我用不同的名称为每个服务中的系统命名。

于 2021-03-11T03:09:49.097 回答