0

我必须使用 UDP 传输更大的文件(最大 10GB)。不幸的是,在这个用例中不能使用 TCP,因为发送者和接收者之间不可能进行双向通信。

发送文件不是问题。我已经使用netty编写了客户端。它读取文件,对其进行编码(唯一 ID、流中的位置等)并以可配置的速率(每秒数据包)将其发送到目的地。所有数据包都在目的地接收。我已经使用 iptables 和 Wireshark 来验证这一点。

问题出现在收件人身上。每秒接收多达 90K 个数据包工作得很好。但是以这种速率接收解码它是不可能使用单个线程的。我的第一种方法是使用线程安全队列(一个生产者和多个消费者)。但是使用多个消费者并没有带来更好的结果。一些数据包仍然丢失。似乎开销(锁定/解锁队列)减慢了进程。所以我决定将 lmax 破坏器与单个生产者(接收 UDP 数据报)和多个消费者(解码数据包)一起使用。但令人惊讶的是,这也不会导致成功。使用两个 lmax 消费者几乎没有速度优势,我想知道为什么。

这是接收 UDP 数据包并调用中断器的主要部分

    public void receiveUdpStream(DatagramChannel channel) {
        boolean exit = false;

        // the size of the UDP datagram
        int size = shareddata.cr.getDatagramsize();
        // the number of decoders (configurable)
        int nn_decoders = shareddata.cr.getDecoders();

        Udp2flowEventFactory factory = new Udp2flowEventFactory(size);
        // the size of the ringbuffer
        int bufferSize = 1 << 10;
        Disruptor<Udp2flowEvent> disruptor = new Disruptor<>(
                factory,
                bufferSize,
                DaemonThreadFactory.INSTANCE,
                ProducerType.SINGLE,
                new YieldingWaitStrategy());

        // my consumers
        Udp2flowDecoder decoder[] = new Udp2flowDecoder[nn_decoders];
        for (int i = 0; i < nn_decoders; i++) {
            decoder[i] = new Udp2flowDecoder(i, shareddata);
        }
        disruptor.handleEventsWith(decoder);

        RingBuffer<Udp2flowEvent> ringBuffer = disruptor.getRingBuffer();
        Udp2flowProducer producer = new Udp2flowProducer(ringBuffer);
        disruptor.start();

        while (!exit) {
            try {
                ByteBuffer buf = ByteBuffer.allocate(size);
                channel.receive(buf);
                receivedDatagrams++; // countig the received packets
                buf.flip();
                producer.onData(buf);
            } catch (Exception e) {
                logger.debug("got exeception " + e);
                exit = true;
            }
       }
    }

我的 lmax事件很简单......

public class Udp2flowEvent {
    
    ByteBuffer buf;
    
    Udp2flowEvent(int size) {
        this.buf = ByteBuffer.allocateDirect(size);
    }
 
    public void set(ByteBuffer buf) {
        this.buf = buf;
    }
    
    public ByteBuffer getEvent() {
        return this.buf;
    }
}

这是我的工厂

public class Udp2flowEventFactory implements EventFactory<Udp2flowEvent> {
    
    private int size;
    
    Udp2flowEventFactory(int size) {
        super();
        this.size = size;
    }
    
    public Udp2flowEvent newInstance() {
        return new Udp2flowEvent(size);
    }
}

制作...

public class Udp2flowProducer {
    
    private final RingBuffer<Udp2flowEvent> ringBuffer;
    
    public Udp2flowProducer(RingBuffer<Udp2flowEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }
    
    public void onData(ByteBuffer buf)
    {
        long sequence = ringBuffer.next();  // Grab the next sequence
        try
        {
            Udp2flowEvent event = ringBuffer.get(sequence); 
            event.set(buf);
        }
        finally
        {
            ringBuffer.publish(sequence);
        }
    }   
}

有趣但非常简单的部分是解码器。它看起来像这样。

    public void onEvent(Udp2flowEvent event, long sequence, boolean endOfBatch) {

        // each consumer decodes its packets
        if (sequence % nn_decoders != decoderid) {
            return;
        }

        ByteBuffer buf = event.getEvent();
        event = null; // is it faster to null the event?
        shareddata.increaseReceiveddatagrams();

        // headertype
        // some code omitted. But the code looks something like this
        final int headertype = buf.getInt();
        final int headerlength = buf.getInt();
        final long payloadlength = buf.getLong();
        // decoding int and longs works fine.
        // but decoding the remaining part not!
        byte[] payload = new byte[buf.remaining()];
        buf.get(payload);
        // some code omitted. The payload is used later on...
    }

这里有一些有趣的事实:

  • 所有解码器都运行良好。我看到正在运行的解码器数量
  • 接收到所有数据包,但解码时间过长。更准确地说:解码前两个整数和长值工作正常,但解码有效负载需要太长时间。这会导致“背压”并丢失一些数据包。
  • 有趣的事实:该代码在我的 MacBook Air 上运行良好,但在我的服务器上却无法运行。(MacBook:Core i7;服务器:在 Xeon @2.6Ghz 上具有 8 个虚拟内核且完全无负载的 ESXi)。

现在我的问题,我希望有人有一个想法:

  • 为什么使用多个消费者几乎没有区别?差别只有5%
  • 一般来说:接收 60K(或更多)UDP 数据包并对其进行解码的最佳方法是什么?我尝试将 netty 作为接收器,但 UDP 不能很好地扩展。
  • 为什么解码有效载荷这么慢?
  • 有没有我忽略的错误?
  • 我应该使用另一个生产者/消费者库吗?LMAX 的延迟非常低,但吞吐量呢?
4

1 回答 1

1

环形缓冲区似乎不是解决这个问题的正确技术,因为当环形缓冲区填满它的所有容量时,它会阻塞,而且它也是一种固有的顺序架构。您需要提前知道预期的最大数据包数量和大小。除非您实施消息保证协议,否则 UDP 也是有损的。

不知道你为什么说 TCP 不是双向的,它是并且它会处理丢失的数据包。

为了应对数据泛滥,如果单个数据包不足,您可能需要将传入数据包分发到单独的服务器。队列应该能够吸收大量数据。如果您想近乎实时地处理大量数据,您可能需要等待大量解码器。

建议你使用 TCP。

于 2021-06-24T06:07:45.763 回答