1

我对以下代码片段有疑问。它旨在处理添加到事件队列 (ConcurrentLinkedQueue) 的事件(通过调用 processEvent 方法提供)。事件被添加到事件队列并在 run 方法中定期处理。

几乎总是一切都很好。但是有时在调用 processEvent 方法之后,当一个事件被添加到队列中时,运行部分无法看到有一个新事件。

知道什么是错的吗?除了使用字符串常量作为锁的明显错误之外?

import java.util.concurrent.ConcurrentLinkedQueue;

public class MyCommunicator implements Runnable {

private ConcurrentLinkedQueue<MyEvent> eventQueue = null;

private boolean stopped = false;

private String lock = "";
private Thread thread = null;

public MyCommunicator() {

    eventQueue = new ConcurrentLinkedQueue<MyEvent>();
}

public void start() {
    thread = new Thread(this, "MyCommunicatorThread");
    thread.start();
}

public void stop() {
    stopped = true;
    synchronized (lock) {
        lock.notifyAll();
    }
    eventQueue.clear();
}

public void run() {
    while (!stopped) {
        try {

            MyEvent event = null;
            while (!stopped && ((event = eventQueue.peek()) != null)) {
                sendEvent(event);
                eventQueue.poll();
            }

            if (!stopped) {
                synchronized (lock) {
                    lock.wait(10000L);
                }
            }
        }

        catch (Exception e) {

        }
    }
}

/**
 * START EVENT JOB - ADD A NEW EVENT TO BE PROCESSED
 */
public void processEvent(MyEvent event) {
    eventQueue.offer(event);
    synchronized (lock) {
        lock.notifyAll();
    }
}

/**
 * END EVENT JOB
 */
private void sendEvent(MyEvent event) {
    // do send event job
}

}
4

4 回答 4

11

为什么要使用锁和通知?

改用LinkedBlockingQueue并省去所有麻烦。

超时poll()将完成您尝试做的所有事情。


编辑:关于当前代码;

您需要定义"fails to see there is a new event"。您的run()方法每 10 秒查看一次队列;如果队列中有东西,它会“看到它”并将其拉出。

  • 如果您的意思是“在收到通知时不会立即看到它,只有 10 秒后”,那么这很容易回答,因为您有一个很容易导致这种情况发生的竞争条件。当该线程完成检查/处理队列和获取锁之间时,可以将某些东西插入队列。如果没有超时,wait()您将阻塞,直到插入下一个事件。如果stop()在此期间调用该方法,您将丢失队列中的所有事件。使用LinkedBlockingQueue而不是所有不必要的锁定和通知来解决这个问题。这不是一个“简单”的解决方案,它是这个用例和问题的正确解决方案。

  • 如果不是这种情况,那么您根本就没有在队列中插入任何东西,问题在于您没有在此处发布的代码。在对该代码一无所知的情况下,猜测是您试图MyEventeventQueue.offer(event). 由于您没有尝试/捕捉,因此offer()您不会知道。忽略所有异常而不检查返回值既不是一个好主意也不是一个好习惯。

  • 第三种可能性是您在某处有一些其他代码锁定在相同的确切内部字符串文字引用上,这将导致此代码挂起。你提到了它,但我会在这里重申 - 这是一件非常糟糕的事情,特别是考虑到它是空字符串。java.util.concurrent包提供带条件的真如果你坚持在这里使用它们。请注意,这仍然不会消除您有时会错过一个事件 10 秒的竞争条件,但它至少会更干净。为了消除您的竞争条件,您需要放弃并发队列而使用常规队列,并在访问它之前简单地获取锁(以及获取插入的锁)。这将同步您的线程,因为除非该线程正在等待锁定条件,否则将阻止插入器插入。在同一块代码中混合使用锁和无锁方法来进行线程同步通常会导致这些问题。

于 2011-04-15T20:44:48.857 回答
5

您有所谓的丢失信号。您轮询队列,然后在监视器上等待(获取锁)。生产者线程添加事件然后调用notifyAll()(获取锁)。事件队列/轮询和条件等待/通知之间没有happens-before关系。

因此线程A可以在空时轮询然后尝试获取锁,同时线程B添加一个元素并获取锁,通知所有等待的线程然后释放锁。线程 A 然后获取锁并等待它,但信号已丢失。

由于您将锁纯粹用于发送信号,您可能会考虑另一种机制,例如 Doug Lea 的新 jdk7 Phaser之类的可重用锁存器,或者直接使用 a BlockingQueue

或者,我们有几个ReusableLatch,例如用于单个读取器线程的 BooleanLatch 或用于多方支持的PhasedLatch 。

于 2011-04-17T07:14:42.510 回答
1

乍一看没有什么特别的想法,但在你不知情的情况下,任何事情都可能出错,因为:

    catch (Exception e) {

    }

捕获任何Exception(包括RuntimeException及其各种子类)然后忽略它的处理程序通常是一个坏主意。如果这是为了捕获特定类型的异常(例如,InterruptedException可能由 引发的异常lock.wait()),那么您应该将其限制为该异常类型。如果您有某种原因捕获任何异常,那么您至少应该在发生异常时记录一些内容。

于 2011-04-15T21:33:46.847 回答
0

我遇到的一个问题ConcurrentLinkedQueue是我真的怀疑这是一个真正的错误,因为它并不是真正同步的完整证明。

我还没有完全测试这个,但是我查看了代码,我很确定如果队列实际上是空的, .isEmpty() 是不同步的。当一个线程调用 .isEmpty() 并true返回时,队列可能已经包含元素。

于 2012-04-05T09:11:44.747 回答