2

出于某种原因,在以下代码中,destinationSource.getQueues()返回的是 aCopyOnWriteArraySet而不是简单的Set. 这是一个问题,因为 for 循环在满之前开始处理,Set并且由于它的性质,CopyOnWriteArraySet它只会处理Setbefore 循环中的项目。我知道我可以Thread.sleep()在那里扔一个,但这并不能解决根本问题。是否有任何理由将其作为 aCopyOnWriteArraySet而不是 a返回Set?还有什么方法可以迭代 aCopyOnWriteArraySet以确保覆盖所有项目,即使是在迭代期间添加的项目?

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();
activeMQConnection.start();
DestinationSource destinationSource = activeMQConnection.getDestinationSource();

Set<ActiveMQQueue> queues = destinationSource.getQueues();

for(ActiveMQQueue queue : queues) {
  queueNames.add(queue.getPhysicalName());
}

activeMQConnection.close()

编辑:这是我提出的解决方案,虽然它并不完美,但它确保您将获得所有队列,直到添加队列之间的时间超过 1 秒。

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

    ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();

    activeMQConnection.start();

    DestinationSource destinationSource = activeMQConnection.getDestinationSource();

    Set<ActiveMQQueue> queues = destinationSource.getQueues();

    do {
        for(ActiveMQQueue queue : queues) {
            String physcialName = queue.getPhysicalName();
            if(!queueNames.contains(physcialName)) {
                queueNames.add(physcialName);
            }
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            log(e.toString());
        }
    }while(queueNames.size() < queues.size());

    activeMQConnection.close();
4

2 回答 2

3

从连接中获取所有队列时我遇到了同样的问题。每当我从 DestinationSource 获得队列,然后在该集合上进行迭代(foreach)时,我得到不同数量的队列(在迭代循环中,我总是得到比初始集合更多的队列)。

DestinationSource ds = connection.getDestinationSource();
Set<ActiveMQQueue> queues = ds.getQueues();
log.debug("Found '" + queues.size() + "' queues");
for (ActiveMQQueue queue : queues) {...}

然后,我像这样向目标源添加了一个侦听器

DestinationSource ds = connection.getDestinationSource();
Set<ActiveMQQueue> queues = ds.getQueues();
// Add listener:
ds.setDestinationListener(event -> event.hashCode());
log.debug("Found '" + queues.size() + "' queues");
for (ActiveMQQueue queue : queues) {...}

从现在开始,我总是得到正确数量的队列,并且可以遍历整个队列。

虽然,我真的不知道为什么;)

于 2016-06-02T12:32:11.180 回答
1

正如您所说,以这种方式运行是CopyOnWriteArraySet的本质。队列列表可以与您的线程同时更改。通过返回一个CopyOnWriteArraySetActiveMQ 给你一个可以在你的线程中安全使用的数据结构(不改变ConcurrentModificationException)并且一个保持最新的数据结构。

由于可以随时添加新队列,因此无法“等待”它们全部完成。

如果您想知道何时添加了新队列,然后做一些响应,最好的方法是监听相应的 ActiveMQ咨询消息。该工具将让您响应消息队列的添加、消费者和生产者的添加以及它们的删除。思考链接有一个代码示例。

于 2015-07-08T18:56:09.823 回答