2

我目前正在开发一个 Java 项目,该项目在原始数据包通过网络时对其进行处理。数据由libpcap读入,然后将每个数据包放入一个byte[],然后放入一个ConcurrentLinkedQueue,线程从中读取并处理数据。

    static ConcurrentLinkedQueue<byte[]> packetQueue = new ConcurrentLinkedQueue<byte[]>();

    public void nextPacket(PcapHeader header, ByteBuffer buffer PcapDumper user) {
        pcapBufferCapacity = buffer.capacity();

        if (pcapBufferCapacity > 54) {
            dumper.dump(header, buffer);
        }

        if (pcapBufferCapacity > 43) {
            byte[] packetBytes = new byte[buffer.remaining()];
            buffer.get(packetBytes);
            packetQueue.add(packetBytes); // Data being added to the queue  
        }
    }

然后我有一个正在运行的线程......

Thread thread = new Thread(new Parser()); // Parser extends Runnable

... poll() 是工作队列,然后处理数据。

int parserByteCount;
byte[] packetBytes;

public void run() {
    packetBytes = MyClass.packetQueue.poll();
    parserByteCount = packetBytes.length;
    doStuff();
}

当我运行一个 Parser() 线程时,一切都按预期工作。但是,如果我运行多个解析器线程...

Thread thread2 = new Thread(new Parser());

...来自 MyClass.packetQueue.poll() 的数据已损坏,并且在从静态 PCAP 文件读取时我的结果不一致。考虑到它可以完美地与一个 Parser() 线程一起工作,但在多个 Parser() 线程运行时会损坏,我认为这与并发性有关。但是,由于数据是从 ConcurrentLinkedQueue 中放入/轮询的,它不应该在线程之间正常工作吗?我错过了什么?

感谢您的任何见解。

4

2 回答 2

2
int parserByteCount;
byte[] packetBytes;

这些是你Parser类的成员变量吗?它们是否需要在内部进行正确解析doStuff()

如果您两次回答“是”,您可能会遇到麻烦。请记住,线程共享内存。因此,必须同步对可以同时访问的所有实例或类级别变量的访问。请注意,局部变量不是共享的——它们存在于堆栈中,这对于每个线程都是唯一的。一个简单的解决方法是将两个关键变量更改为局部变量,并在必要时将它们作为方法参数传递。

于 2013-10-08T20:04:25.457 回答
0

ConcurrentLinkedQueue poll() 产生损坏的数据?

如果这意味着它ConcurrentLinkedQueue有过错,那么这是极不可能的。我怀疑这要么是您将数据放入队列的方式,要么是您如何使用它。

byte[] packetBytes = new byte[buffer.remaining()];
buffer.get(packetBytes);
packetQueue.add(packetBytes); // Data being added to the queue  

这段代码看起来不错。我担心您正在重新使用 abyte[]但我认为这里没有问题。

packetBytes = MyClass.packetQueue.poll();
parserByteCount = packetBytes.length;
doStuff();

这看起来不错,除了您应该检查的poll()可能返回。null您可能会考虑切换到LinkedBlockingQueue允许更多方法来等待队列中的条目。但这并不能解决您的问题。

这留下了一些问题doStuff()

  • 是否正在doStuff()更新一些不存在的常见对象synchronized?也许是输出文件或其他一些持久性机制?
  • 您应该传递packetBytesdoStuff()而不是使其成为实例变量吗?
  • 您处理的任何部分是否packetBytes不恰当地修改了数组?不知道为什么这适用于一个线程而不是两个线程。

来自 MyClass.packetQueue.poll() 的数据已损坏,我的结果不一致

你能更明确地说一下“腐败”吗?System.out.println(...)当它们被添加到队列和被删除时,你能做一个并转储数组中的值吗?我想你会发现队列不是问题。

于 2013-10-08T20:01:55.300 回答