我正在构建一个创建主题树的扩散解决方案。
我正在按需创建主题以反映从 RabbitMQ 提要收到的值。每个主题都有内存成本,因此我希望在一段时间没有订阅者后删除该主题。
如何使用统一的 Java API 做到这一点?
我正在构建一个创建主题树的扩散解决方案。
我正在按需创建主题以反映从 RabbitMQ 提要收到的值。每个主题都有内存成本,因此我希望在一段时间没有订阅者后删除该主题。
如何使用统一的 Java API 做到这一点?
TopicEventListener (TopicControl 特性的一部分)提供了这个功能。为主题有 0 个订阅者以及主题至少有 1 个订阅者时提供单独的回调。
例子:
public TopicEventListenerClient() {
session = Diffusion.sessions().principal("admin").password("password").open("ws://localhost:8080");
topicControl = session.feature(TopicControl.class);
topicControl.addTopicEventListener("rabbitMQ/foo", new TopicEventListener() {
@Override
public void onClose(String arg0) {
LOG.info("Listener closed");
}
@Override
public void onError(String arg0, ErrorReason arg1) {
LOG.info("Error on listener: " + arg1);
}
@Override
public void onRegistered(String arg0, Registration arg1) {
LOG.info("Listener registered");
}
@Override
public void onHasSubscribers(String arg0) {
LOG.info("Topic: " + arg0 + " has at least 1 subscriber");
}
@Override
public void onNoSubscribers(String arg0) {
LOG.info("Topic: " + arg0 + " has no subscribers");
}
});
}
希望这可以帮助!
编辑:2019 年 4 月 10 日
不推荐使用TopicEventListener(从 Diffusion 版本 6.1 开始)。删除没有订阅者的主题的首选方法是自动主题删除。这可以通过将删除策略指定为主题规范的一部分来实现。
例如:
final Session session = Diffusion.sessions().principal("admin").password("password").open("ws://localhost:8080");
final TopicControl topicControl = session.feature(TopicControl.class);
final TopicSpecification specification =
topicControl.newSpecification(TopicType.JSON)
.withProperty(TopicSpecification.REMOVAL, "when subscriptions < 1 for 10s");
上面的代码为一个 JSON 主题创建了一个主题规范,如果 10 秒内没有订阅者,该主题将被自动删除。