我必须使用 UDP 传输更大的文件(最大 10GB)。不幸的是,在这个用例中不能使用 TCP,因为发送者和接收者之间不可能进行双向通信。
发送文件不是问题。我已经使用netty编写了客户端。它读取文件,对其进行编码(唯一 ID、流中的位置等)并以可配置的速率(每秒数据包)将其发送到目的地。所有数据包都在目的地接收。我已经使用 iptables 和 Wireshark 来验证这一点。
问题出现在收件人身上。每秒接收多达 90K 个数据包工作得很好。但是以这种速率接收和解码它是不可能使用单个线程的。我的第一种方法是使用线程安全队列(一个生产者和多个消费者)。但是使用多个消费者并没有带来更好的结果。一些数据包仍然丢失。似乎开销(锁定/解锁队列)减慢了进程。所以我决定将 lmax 破坏器与单个生产者(接收 UDP 数据报)和多个消费者(解码数据包)一起使用。但令人惊讶的是,这也不会导致成功。使用两个 lmax 消费者几乎没有速度优势,我想知道为什么。
这是接收 UDP 数据包并调用中断器的主要部分
public void receiveUdpStream(DatagramChannel channel) {
boolean exit = false;
// the size of the UDP datagram
int size = shareddata.cr.getDatagramsize();
// the number of decoders (configurable)
int nn_decoders = shareddata.cr.getDecoders();
Udp2flowEventFactory factory = new Udp2flowEventFactory(size);
// the size of the ringbuffer
int bufferSize = 1 << 10;
Disruptor<Udp2flowEvent> disruptor = new Disruptor<>(
factory,
bufferSize,
DaemonThreadFactory.INSTANCE,
ProducerType.SINGLE,
new YieldingWaitStrategy());
// my consumers
Udp2flowDecoder decoder[] = new Udp2flowDecoder[nn_decoders];
for (int i = 0; i < nn_decoders; i++) {
decoder[i] = new Udp2flowDecoder(i, shareddata);
}
disruptor.handleEventsWith(decoder);
RingBuffer<Udp2flowEvent> ringBuffer = disruptor.getRingBuffer();
Udp2flowProducer producer = new Udp2flowProducer(ringBuffer);
disruptor.start();
while (!exit) {
try {
ByteBuffer buf = ByteBuffer.allocate(size);
channel.receive(buf);
receivedDatagrams++; // countig the received packets
buf.flip();
producer.onData(buf);
} catch (Exception e) {
logger.debug("got exeception " + e);
exit = true;
}
}
}
我的 lmax事件很简单......
public class Udp2flowEvent {
ByteBuffer buf;
Udp2flowEvent(int size) {
this.buf = ByteBuffer.allocateDirect(size);
}
public void set(ByteBuffer buf) {
this.buf = buf;
}
public ByteBuffer getEvent() {
return this.buf;
}
}
这是我的工厂
public class Udp2flowEventFactory implements EventFactory<Udp2flowEvent> {
private int size;
Udp2flowEventFactory(int size) {
super();
this.size = size;
}
public Udp2flowEvent newInstance() {
return new Udp2flowEvent(size);
}
}
制作人...
public class Udp2flowProducer {
private final RingBuffer<Udp2flowEvent> ringBuffer;
public Udp2flowProducer(RingBuffer<Udp2flowEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer buf)
{
long sequence = ringBuffer.next(); // Grab the next sequence
try
{
Udp2flowEvent event = ringBuffer.get(sequence);
event.set(buf);
}
finally
{
ringBuffer.publish(sequence);
}
}
}
有趣但非常简单的部分是解码器。它看起来像这样。
public void onEvent(Udp2flowEvent event, long sequence, boolean endOfBatch) {
// each consumer decodes its packets
if (sequence % nn_decoders != decoderid) {
return;
}
ByteBuffer buf = event.getEvent();
event = null; // is it faster to null the event?
shareddata.increaseReceiveddatagrams();
// headertype
// some code omitted. But the code looks something like this
final int headertype = buf.getInt();
final int headerlength = buf.getInt();
final long payloadlength = buf.getLong();
// decoding int and longs works fine.
// but decoding the remaining part not!
byte[] payload = new byte[buf.remaining()];
buf.get(payload);
// some code omitted. The payload is used later on...
}
这里有一些有趣的事实:
- 所有解码器都运行良好。我看到正在运行的解码器数量
- 接收到所有数据包,但解码时间过长。更准确地说:解码前两个整数和长值工作正常,但解码有效负载需要太长时间。这会导致“背压”并丢失一些数据包。
- 有趣的事实:该代码在我的 MacBook Air 上运行良好,但在我的服务器上却无法运行。(MacBook:Core i7;服务器:在 Xeon @2.6Ghz 上具有 8 个虚拟内核且完全无负载的 ESXi)。
现在我的问题,我希望有人有一个想法:
- 为什么使用多个消费者几乎没有区别?差别只有5%
- 一般来说:接收 60K(或更多)UDP 数据包并对其进行解码的最佳方法是什么?我尝试将 netty 作为接收器,但 UDP 不能很好地扩展。
- 为什么解码有效载荷这么慢?
- 有没有我忽略的错误?
- 我应该使用另一个生产者/消费者库吗?LMAX 的延迟非常低,但吞吐量呢?