我有这种情况,我从数千个来源接收事件。每个来源都在发送有关其当前状态的信息。虽然我确实想处理所有事件,但更重要的是首先处理每个源的最新事件,以便当前视图是最新的。所以我正在考虑使用ConcurrentHashMap
每个源的标识符作为键,并使用 LIFO 队列(堆栈)作为值。然后,我将遍历 的键,然后从Map
每个源的堆栈中弹出一项。
我担心的是,当我遍历键并从每个键的队列中取出项目时,生产者可能会在队列上发布新事件,这可能会产生并发问题。生产者还可以向映射添加新键,并且迭代似乎是弱一致entrySet
的。Map
这不是一个大问题,因为新项目将在后续迭代中处理。理想情况下,我还可以在流上使用一些并行处理entrySet
来加快处理速度。
我想知道是否有更清洁的方法。实际上,我本可以使用 LIFOBlockingDequeue
并首先处理最新事件,但这种方法的问题在于,一个源可能发送比其他源更多的事件,因此可能比其他源处理更多的事件。
是否有任何其他数据结构可以提供这种行为?本质上,我正在寻找一种对来自每个来源的事件进行优先级排序的方法,同时为每个来源提供公平的机会以供消费者处理。