我有一个使用 ConcurrentSkipListMap 实现的优先级队列,使用 16 个不同的优先级。
class ConcurrentPriorityQueue {
ConcurrentSkipListMap<Long, Message> queue = new ConcurrentSkipListMap<>();
AtomicLong counter16 = new AtomicLong(Long.MAX_VALUE);
AtomicLong counter15 = new AtomicLong(Long.MAX_VALUE / 8 * 7);
AtomicLong counter14 = new AtomicLong(Long.MAX_VALUE / 4 * 3);
// etc
AtomicLong counter1 = new AtomicLong(Long.MIN_VALUE / 8 * 7);
void addPriority16(Message message) {
queue.put(counter16.getAndDecrement(), message);
}
void addPriority15(Message message) {
queue.put(counter15.getAndDecrement(), message);
}
// and so on
}
这并不是类的确切组织方式(例如,我将 AtomicLongs 放在一个数组中),但我认为这段代码会更清晰。还有一个 DelayQueue 可以删除旧消息或提高旧消息的优先级(取决于消息类型)。
我的问题是我有几个消费者正在使用pollLastEntry()
,以便从队列中删除最高优先级的消息,然后如果队列为空,则进入休眠状态,但问题是队列活动会爆发 - 它会持续一个小时没有包含多个消息,然后在接下来的一个小时内它永远不会为空。因此,我想使用阻塞方法从队列中删除消息,这样我就不会在重复休眠的线程上浪费资源(我会使用指数退避让它们在活动较少时休眠更长时间,但这会使当队列再次启动时它们没有响应),但不清楚实现这一点的最佳方法 - 我有丰富的使用经验阻塞队列,但实现它们的经验为零。我的第一个想法是在休眠的消费者中实现指数退避,然后在队列活动再次启动时中断它们,但我首先想看看是否有更好的方法来做到这一点。