3

我有一个线程池,它们从队列中获取任务。通常,少量线程能够保持队列为空。偶尔,特别大的突发事件会使队列大小在一段时间内保持在零以上,但不会持续很长时间。

我担心的是重复的事件或携带过时的数据的事件。在高容量期间,此类事件可能会在队列中并存一小段时间。我希望能够将这些混为一谈,这样我就可以减少浪费在工作上的时间。

合并这样一个队列的好方法是什么?我可以在插入时通过从头到尾迭代并搜索要替换的候选者来合并,但这似乎太暴力了。如果您有代码或库建议,请记住我使用的是 Java。

4

3 回答 3

5

为什么不根据您的任务实现 hashCode() 和 equals() 。然后只需删除任务。例如。

queue.remove(task);
queue.offer(task);

那么你将不会有重复。或者。

if(!queue.contains(task)) {
   queue.offer(task);
}

如果它已经在队列中,这将避免将任务排队。

于 2009-04-13T11:44:42.003 回答
2

如果使用LinkedHashMap,则可以保留条目添加到队列中的顺序。

当匹配条目到达时,我推测您希望将其一些数据附加到原始队列条目。在这种情况下,您可以就地更新散列对象,或者使用HashMap.put(key, value)新对象替换排队的项目。(我认为这保留了原始项目的顺序,但我没有对此进行测试。)

LinkedHashMap请注意,您的代码将需要显式同步对其内部数据的读写访问。您不希望在另一个线程抓取它进行处理的同时更新队列中的项目。最简单的同步方法可能是通过LinkedHashMap访问Collections.synchronizedMap().

于 2009-04-13T21:09:07.403 回答
2

这个 conflator 似乎可以满足您的需求: https ://github.com/GuillaumeArnaud/conflator

根据您的要求,如果合并队列中存在一个现有事件,则可以将实现更改为合并或用现有事件替换最新事件。

例如。对于以下内容,每个事件都被实现为一个“Tick”,它定义了合并行为。

public class Tick implements Message<Tick> {

    private final String ticker;

    public long getInitialQuantity() {
        return initialQuantity;
    }

    private final long initialQuantity;

    public long getCurrentQuantity() {
        return currentQuantity;
    }

    private long currentQuantity;
    private int numberOfMerges;

    public String getTicker() {
        return ticker;
    }

    public Tick(String ticker, long quantity) {
        this.ticker = ticker;
        this.initialQuantity = quantity;
        this.currentQuantity = quantity;
    }

    @Override
    public String key() {
        return this.ticker;
    }

    @Override
    public String body() {
        return String.valueOf(currentQuantity);
    }

    @Override
    public boolean isMerged() {
        return this.initialQuantity != this.currentQuantity;
    }

    @Override
    public int mergesCount() {
        return numberOfMerges;
    }

    @Override
    public boolean isValid() {
        return false;
    }

    @Override
    public boolean merge(Tick message) {
        if (this.equals(message)) {
            this.currentQuantity += message.currentQuantity;
            numberOfMerges++;
            return true;
        }
        return false;
    }

    @Override
    public int hashCode() {
        return ticker.hashCode();
    }

    @Override
    public boolean equals(Object obj) {
        if (obj != null && obj instanceof Tick) {
            Tick other = (Tick) obj;
            return this.ticker.equals(other.getTicker());
        }
        return false;
    }

测试用例:

public class TickMergeTest {
    MultiValuedMapConflator conflator;

    @Test
    public void two_unmergeable_ticks_should_be_remain_unmergeable() {
        Tick tick1 = new Tick("GOOG", 100L);
        Tick tick2 = new Tick("AAPL", 120L);

        List<Tick> messages = conflator.merge(Lists.newArrayList(tick1, tick2));

        assertNotNull(messages);
        assertEquals(messages.size(), 2);
        assertEquals(Long.valueOf(messages.get(0).body()).longValue(), tick1.getCurrentQuantity());
        assertEquals(Long.valueOf(messages.get(1).body()).longValue(), tick2.getCurrentQuantity());
    }

    @Test(timeout = 1000)
    public void two_mergeable_ticks_should_be_merged() {
        Tick tick1 = new Tick("GOOG", 100L);
        Tick tick2 = new Tick("GOOG", 120L);

        List<Tick> messages = conflator.merge(Lists.newArrayList(tick1, tick2));

        assertNotNull(messages);
        assertEquals(messages.size(), 1);
        assertEquals(Long.valueOf(messages.get(0).body()).longValue(), tick1.getInitialQuantity() + tick2.getInitialQuantity());
    }

    @Test(timeout = 1000)
    public void should_merge_messages_on_same_key() throws InterruptedException {
        // given
        conflator.put(new Tick("GOOG", 100L));
        conflator.put(new Tick("GOOG", 120L));

        // test
        Thread.sleep(300); // waiting the conflation
        Message message = conflator.take();

        // check
        assertNotNull(message);
        assertEquals(Long.valueOf(message.body()).longValue(), 220L);
        assertTrue(message.isMerged());
    }

    @Test(timeout = 1000)
    public void should_not_merge_messages_on_diff_key() throws InterruptedException {
        // given
        conflator.put(new Tick("GOOG", 100L));
        conflator.put(new Tick("AAPL", 120L));

        // test
        Thread.sleep(300); // waiting the conflation
        Message message1 = conflator.take();
        Message message2 = conflator.take();

        // check
        assertNotNull(message1);
        assertNotNull(message2);

        assertEquals(Long.valueOf(message1.body()).longValue(), 100L);
        assertFalse(message1.isMerged());

        assertEquals(Long.valueOf(message2.body()).longValue(), 120L);
        assertFalse(message2.isMerged());

    }
    @Before
    public void setUp() {
        conflator = new MultiValuedMapConflator<Tick>(true);
    }

    @After
    public void tearDown() {
        conflator.stop();
    }
}
于 2016-10-14T07:06:00.987 回答