1

这是我的课程,有两种方法可以修改 List PacketQueue。这两个方法在两个线程中执行,因此synchronize被标记。

public class MessageHandler implements nuctrl.interfaces.MessageHandler, Runnable {
    private static final List<GatewayMsg> PacketQueue = new LinkedList<GatewayMsg>();

    @Override
    public void insert(GatewayMsg msg) {
        synchronized (PacketQueue){
            PacketQueue.add(msg);
            PacketQueue.notify();
        }
        log.debug("insert " + msg.toString());
    }

    @Override
    public void run() {
        while(running){
            synchronized (PacketQueue){
                try {
                    while(PacketQueue.size() == 0){
                        PacketQueue.wait();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
                for (GatewayMsg msg : PacketQueue){
                    PacketQueue.remove(msg);
                    packetHandler.onPacket(msg);//method call
                }
            }
        }
    }
}

run()用于线程 4insert()用于另一个线程(I/O Worker #1)。Synchronized已添加,一切似乎都很好,但我仍然不断收到 ConcurrentModificationException。

DEBUG [New I/O worker #1] (MessageHandler.java:47)| insert GatewayMsg<>
Exception in thread "Thread-4" java.util.ConcurrentModificationException
    at java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:761)
    at java.util.LinkedList$ListItr.next(LinkedList.java:696)
    at nuctrl.core.MessageHandler.run(MessageHandler.java:67)
    at java.lang.Thread.run(Thread.java:680)

它现在让我发疯!任何人都可以帮助找出错误吗?或者其他方法来做同样的事情?

4

3 回答 3

4

ConcurrentModificationException如果同步代码在迭代期间修改集合,同步不会做任何事情,您可以在此处执行此操作:

for (GatewayMsg msg : PacketQueue){
    PacketQueue.remove(msg);     // <== Not allowed during iteration
    packetHandler.onPacket(msg);
}

在迭代期间,您只能通过删除元素Iterator,例如:

Iterator<GatewayMsg> it = PacketQueue.iterator();
while (it.hasNext()) {
    GatewayMsg msg = it.next();
    it.remove();                 // <== This is allowed, provided the collection supports it
    packetHandler.onPacket(msg);
}
于 2013-10-13T14:14:34.330 回答
4

这与同步无关 - 即使在单个线程中,这样的代码也会触发异常:

for (GatewayMsg msg : PacketQueue){
    PacketQueue.remove(msg);
    packetHandler.onPacket(msg);
}

这是因为您正在修改您正在迭代的集合。

要解决此问题,请在循环中使用列表迭代器,并调用迭代器的remove. 更好的是,循环处理所有项目,然后PacketQueue一次性清除所有项目,如下所示:

for (GatewayMsg msg : PacketQueue){
    packetHandler.onPacket(msg);
}
PacketQueue.clear();

这可以正常工作,因为访问PacketQueue是同步的:其他线程将看不到PacketQueue其部分消息已处理的状态,但它们仍保留在队列中。

于 2013-10-13T14:14:45.157 回答
0

您获得 CME 的原因是您在对其进行迭代时进行修改。库无法区分您的线程和修改它的另一个线程。


最简单的解决方案是不要自己编写此队列/线程处理代码。我会使用 ExecutorService 编写它

public class MessageHandler implements nuctrl.interfaces.MessageHandler {
    private static final ExecutorService EXEC = Executors.newSingleThreadExecutor();

    @Override
    public void insert(final GatewayMsg msg) {
        EXEC.submit(new Runnable() {
            @Override
            public void run() {
                packetHandler.onPacket(msg);//method call
            }
        });
        if(log.isDebugEnabled())
            log.debug("submitted " + msg);
    }

    public static void stop() {
        EXEC.shutdown();
    }
}

注意:这不需要包裹在线程中。

我检查是否启用了日志记录,因为生成不需要的字符串可能会慢得多,而且加速应用程序的最简单方法通常是避免它们。

于 2013-10-13T14:24:55.483 回答