2

我有这种情况,我从数千个来源接收事件。每个来源都在发送有关其当前状态的信息。虽然我确实想处理所有事件,但更重要的是首先处理每个源的最新事件,以便当前视图是最新的。所以我正在考虑使用ConcurrentHashMap每个源的标识符作为键,并使用 LIFO 队列(堆栈)作为值。然后,我将遍历 的键,然后从Map每个源的堆栈中弹出一项。

我担心的是,当我遍历键并从每个键的队列中取出项目时,生产者可能会在队列上发布新事件,这可能会产生并发问题。生产者还可以向映射添加新键,并且迭代似乎是弱一致entrySet的。Map这不是一个大问题,因为新项目将在后续迭代中处理。理想情况下,我还可以在流上使用一些并行处理entrySet来加快处理速度。

我想知道是否有更清洁的方法。实际上,我本可以使用 LIFOBlockingDequeue并首先处理最新事件,但这种方法的问题在于,一个源可能发送比其他源更多的事件,因此可能比其他源处理更多的事件。

是否有任何其他数据结构可以提供这种行为?本质上,我正在寻找一种对来自每个来源的事件进行优先级排序的方法,同时为每个来源提供公平的机会以供消费者处理。

4

2 回答 2

0

我建议构建您自己的结构来管理它,因为它特别为您的用例增加了灵活性(和速度)。

我会使用一个循环队列来存储每个 LIFO 队列(堆栈)。循环队列是将元素添加到尾部并从头部读取(但不删除)的队列。一旦头=尾,你就重新开始。

您可以使用简单的数组构建自己的队列。围绕操作管理同步并不难,例如向阵列添加更多队列 - 并在需要时扩展它。我相信向数组添加队列并不是你经常做的事情。

这很容易管理,您可以扩展循环队列以计算条目被访问的频率,并限制对其条目的访问频率(通过添加/删除消费者线程,甚至让它们在消费之前稍等片刻)由条目管理的堆栈)。

您甚至可以在使用多个线程从循环队列中读取元素时避免线程锁定,方法是让它们在从堆栈中消费之前调用“注册”操作:每个线程都有自己的 ID,当它“注册”时,ID 存储在给定的位置队列条目。在注册之前和出栈之前,线程会做一个“读取注册ID”的操作,返回的ID必须和自己的ID匹配。这意味着只有“拥有”给定队列条目的线程才能从该堆栈中弹出。如果注册过程的注册/确认失败,则意味着另一个线程正在从该条目中消费,因此当前线程移动到下一个可用条目。

我过去使用过这种策略,它的规模就像一个魅力。我希望这对你有意义。

于 2016-09-17T10:10:44.467 回答
0

您是否考虑过 LIFO 队列的 FIFO 队列?每个源都添加到其 LIFO 队列中,为了进行处理,您从 FIFO 队列中取出第一个 LIFO 队列,处理一个事件,然后将其放回 FIFO 队列。这样,您对新源也应该没有问题,因为它们的 LIFO 队列将简单地添加到 FIFO 队列中。

为了将事件添加到正确的 LIFO 队列,您可以维护一个额外的 HashMap,该 HashMap 知道每个源的队列,如果出现尚未在 Map 中的新源,您知道必须将其 LIFO 队列添加到 FIFO 队列。

于 2016-09-17T09:47:32.970 回答