我目前正在开发一个 Java 项目,该项目在原始数据包通过网络时对其进行处理。数据由libpcap读入,然后将每个数据包放入一个byte[],然后放入一个ConcurrentLinkedQueue,线程从中读取并处理数据。
static ConcurrentLinkedQueue<byte[]> packetQueue = new ConcurrentLinkedQueue<byte[]>();
public void nextPacket(PcapHeader header, ByteBuffer buffer PcapDumper user) {
pcapBufferCapacity = buffer.capacity();
if (pcapBufferCapacity > 54) {
dumper.dump(header, buffer);
}
if (pcapBufferCapacity > 43) {
byte[] packetBytes = new byte[buffer.remaining()];
buffer.get(packetBytes);
packetQueue.add(packetBytes); // Data being added to the queue
}
}
然后我有一个正在运行的线程......
Thread thread = new Thread(new Parser()); // Parser extends Runnable
... poll() 是工作队列,然后处理数据。
int parserByteCount;
byte[] packetBytes;
public void run() {
packetBytes = MyClass.packetQueue.poll();
parserByteCount = packetBytes.length;
doStuff();
}
当我运行一个 Parser() 线程时,一切都按预期工作。但是,如果我运行多个解析器线程...
Thread thread2 = new Thread(new Parser());
...来自 MyClass.packetQueue.poll() 的数据已损坏,并且在从静态 PCAP 文件读取时我的结果不一致。考虑到它可以完美地与一个 Parser() 线程一起工作,但在多个 Parser() 线程运行时会损坏,我认为这与并发性有关。但是,由于数据是从 ConcurrentLinkedQueue 中放入/轮询的,它不应该在线程之间正常工作吗?我错过了什么?
感谢您的任何见解。