消费者队列是在客户端分配的,代理对此一无所知。
那么我们如何监控哪个队列分配给哪个消费者客户端呢?
虽然没有退出命令,但对于每个消费者组的每个消息队列,您可以使用提供的管理基础设施找到客户端。这是实现此目的的代码段:
private Map<MessageQueue, String> getClientConnection(DefaultMQAdminExt defaultMQAdminExt, String groupName){
Map<MessageQueue, String> results = new HashMap<MessageQueue, String>();
try{
ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName);
for (Connection connection : consumerConnection.getConnectionSet()){
String clinetId = connection.getClientId();
ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(groupName, clinetId, false);
for(MessageQueue messageQueue : consumerRunningInfo.getMqTable().keySet()){
results.put(messageQueue, clinetId + " " + connection.getClientAddr());
}
}
}catch (Exception e){
}
return results;
}
如果您还没有使用 RocketMQ-Console 项目,请尝试运行它:https ://github.com/rocketmq/rocketmq-console-ng
在 Consumer 选项卡中,点击“consumer detail”按钮,可以直观地看到消息队列分配结果如下: 消息队列分配结果