2

我正在使用观察者模式和 BlockingQueue 来添加一些实例。现在在另一种方法中我正在使用队列,但似乎 take() 正在等待永远,即使我这样做是这样的:

/** {@inheritDoc} */
@Override
public void diffListener(final EDiff paramDiff, final IStructuralItem paramNewNode,
    final IStructuralItem paramOldNode, final DiffDepth paramDepth) {
    final Diff diff =
        new Diff(paramDiff, paramNewNode.getNodeKey(), paramOldNode.getNodeKey(), paramDepth);
    mDiffs.add(diff);
    try {
        mDiffQueue.put(diff);
    } catch (final InterruptedException e) {
        LOGWRAPPER.error(e.getMessage(), e);
    }
    mEntries++;

    if (mEntries == AFTER_COUNT_DIFFS) {
        try {
            mRunner.run(new PopulateDatabase(mDiffDatabase, mDiffs));
        } catch (final Exception e) {
            LOGWRAPPER.error(e.getMessage(), e);
        }
        mEntries = 0;
        mDiffs = new LinkedList<>();
    }
}

/** {@inheritDoc} */
@Override
public void diffDone() {
    try {
        mRunner.run(new PopulateDatabase(mDiffDatabase, mDiffs));
    } catch (final Exception e) {
        LOGWRAPPER.error(e.getMessage(), e);
    }
    mDone = true;
}

而 mDiffQueue 是一个 LinkedBlockingQueue ,我使用它是这样的:

while (!(mDiffQueue.isEmpty() && mDone) || mDiffQueue.take().getDiff() == EDiff.INSERTED) {}

但我认为第一个表达式被检查,而 mDone 不是真的,那么也许 mDone 设置为真(观察者总是多线程的?),但它已经在调用 mDiffQueue.take()?:-/

编辑:我现在真的不明白。我最近将其更改为:

synchronized (mDiffQueue) {
    while (!(mDiffQueue.isEmpty() && mDone)) {
        if (mDiffQueue.take().getDiff() != EDiff.INSERTED) {
            break;
        }
    }
}

如果我在调试器中稍等片刻,它可以工作,但它也应该“实时”工作,因为 mDone 被初始化为 false,因此 while 条件应该为真,并且应该执行主体。

如果 mDiffQueue 为空且 mDone 为 true,则它应该跳过 while 循环的主体(这意味着队列不再填充)。

编辑:似乎是:

synchronized (mDiffQueue) {
    while (!(mDiffQueue.isEmpty() && mDone)) {
         if (mDiffQueue.peek() != null) {
             if (mDiffQueue.take().getDiff() != EDiff.INSERTED) {
                 break;
             }
         }
    }
}

即使我不明白为什么 peek() 是强制性的。

编辑:

我正在做的是遍历一棵树,我想跳过所有 INSERTED 节点:

for (final AbsAxis axis = new DescendantAxis(paramRtx, true); axis.hasNext(); axis.next()) {
    skipInserts();
    final IStructuralItem node = paramRtx.getStructuralNode();
    if (node.hasFirstChild()) {
        depth++;
        skipInserts();
        ...

基本上计算树中的最大深度或级别,而不考虑在树的另一个修订版中已删除的节点(用于比较 Sunburst 可视化),但是好的,这可能超出了范围。只是为了说明我正在对尚未插入的节点做一些事情,即使它只是调整最大深度。

问候,

约翰内斯

4

2 回答 2

2

take()是一个“阻塞呼叫”。这意味着它将阻塞(永远等待),直到队列中有东西,然后它将返回添加的内容。当然,如果有东西在队列中,它会立即返回。

您可以使用peek()来返回返回的内容take()- 即peek()返回下一个项目而不将其从队列中删除,或者null如果队列中没有任何内容则返回。尝试peek()在您的测试中使用代替(但也要检查 null )。

于 2011-09-07T11:51:15.153 回答
1

第一个建议:不要synchronized (mDiffQueue)synchronized如果 LinkedBlockingQueue 有一些方法,你会陷入僵局;这里不是这种情况,但这是一种你应该避免的做法。无论如何,我不明白你为什么要同步。

mDone在等待检查是否已设置时,您必须定期“唤醒” :

while (!(mDiffQueue.isEmpty()  && mDone)) {
   // poll returns null if nothing is added in the queue for 0.1 second.
   Diff diff = mDiffQueue.poll(0.1, TimeUnit.SECONDS); 
   if (diff != null)
      process(diff);
}

这与 using 大致相同peek,但peek基本上是等待一纳秒。usingpeek称为“忙等待”(您的线程不间断地运行 while 循环),而 usingpool称为“半忙等待”(您让线程每隔一段时间休眠)。

我想在你的情况下process(diff),如果diff不是 type ,就退出循环EDiff.INSERTED。我不确定这是否是你想要完成的。这看起来很奇怪,因为您基本上只是在停止消费者线程,直到您获得正确类型的单个元素,然后您什么也不做。而且由于您不在 while 循环中,因此您无法接收未来的传入元素。

于 2011-09-07T12:50:36.667 回答