0

我使用 java.nio 编写了一个非常简单的多播/UDP 发布者和侦听器,我正在通过执行以下操作来测试端到端延迟:

  • 在发布方面,我发送了一个包含当前时间戳的数据报,精度为微秒。
  • 在侦听方面,我只是解析该时间戳,然后获取当前时间戳并计算差异以获得端到端延迟。

请注意,我没有进行任何 CPU 固定,而是在 Mac OS 上运行 Java 11。我在环回接口上运行发布/订阅的 2 个进程,我得到的延迟范围为 100 到 400 微秒。我发现它非常高,我预计延迟会快一个数量级。我错过了什么吗?

num_points: 100, 50th: 167us, 70th: 183us, 99th: 369us, min: 109, max: 369us
[100 -      110 us] -
[110 -      120 us] ----
[120 -      130 us] ----
[130 -      140 us] --------------
[140 -      150 us] -----------
[150 -      160 us] ---------
[160 -      170 us] ------------
[170 -      180 us] ------------
[180 -      190 us] ----
[190 -      200 us] ----
[200 -      210 us] ------
[210 -      220 us] --
[220 -      230 us] ---
[230 -      240 us] -----
[240 -      250 us] -----
[250 -      260 us] 
[260 -      270 us] 
[270 -      280 us] -
[280 -      290 us] 
[290 -      300 us] 
[300 - Infinity us] ---

如果你想重现,我将代码添加到这篇文章中。要运行程序,参数如下:

// For the publisher
pub <iface_ip> <dst_group> <port> <millis_between_publication>
// For instance
pub 127.0.0.1 224.0.0.1 6000 50

// For the subscriber
sub <iface_ip> <dst_group> <port> <number_of_datapoints>
// For instance
sub 127.0.0.1 224.0.0.1 6000 100

这是代码:

import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

public class UDPPubSubBench {

    private static final int BUF_SIZE = 2048;

    public static void main(String[] args) {
        if (args.length != 5)
            throw new RuntimeException("Not enough arguments");

        final String mode = args[0];
        final String iface_ip = args[1];
        final String dst_group = args[2];
        final int port = Integer.parseInt(args[3]);
        final int val = Integer.parseInt(args[4]);
        System.out.printf("iface: %s, group: %s, port: %d\n", iface_ip, dst_group, port);

        try {
            if (mode.equals("pub")) { // Publisher - multicasts data over UPD
                runPublisher(iface_ip, dst_group, port, val);

            } else if (mode.equals("sub")) { // Subscriber - joins UDP multicast group and listen for datagrams
                runSubscriber(iface_ip, dst_group, port, val);

            } else {
                throw new RuntimeException("Invalid arguments!");
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    private static void runPublisher(final String iface_ip, final String group, final int port,
                                     int cycle_millis) throws Exception {
        if (cycle_millis < 0)
            throw new RuntimeException("cycle_millis cannot be negative!");

        // Setting up UDP channel to publish datagram on multicast group
        final DatagramChannel channel_ = DatagramChannel.open(StandardProtocolFamily.INET);
        channel_.bind(null)
                .setOption(StandardSocketOptions.IP_MULTICAST_IF, NetworkInterface.getByInetAddress(InetAddress.getByName(iface_ip)))
                .setOption(StandardSocketOptions.IP_MULTICAST_LOOP, true)
                .configureBlocking(false);

        final InetSocketAddress dst_group_ = new InetSocketAddress(group, port);
        final ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);

        // Endless loop - publishing a datagram every 2 seconds
        System.out.println("Running publisher...");
        while (true) {
            final long now_micros = ChronoUnit.MICROS.between(Instant.EPOCH, Instant.now());
            buf.putLong(now_micros).flip();
            if (channel_.send(buf, dst_group_) != 8) // Could not write the full long
                throw new RuntimeException("Socket not ready to write!");
            buf.clear();
            System.out.println("--> " + now_micros);

            Thread.sleep(cycle_millis);
        }
    }

    private static void runSubscriber(final String iface_ip, final String group, final int port,
                                      int num_data_points) throws Exception {
        if (num_data_points < 0)
            throw new RuntimeException("num_data_points cannot be negative!");

        // Setting up channel to join multicast group
        final NetworkInterface iface = NetworkInterface.getByInetAddress(InetAddress.getByName(iface_ip));
        final DatagramChannel channel = DatagramChannel.open(StandardProtocolFamily.INET)
                .setOption(StandardSocketOptions.SO_REUSEADDR, true)
                .bind(new InetSocketAddress(port))
                .setOption(StandardSocketOptions.IP_MULTICAST_IF, iface)
                .setOption(StandardSocketOptions.IP_MULTICAST_LOOP, true);
        channel.socket().setReceiveBufferSize(BUF_SIZE);
        channel.configureBlocking(false);


        // Joining multicast group
        channel.join(InetAddress.getByName(group), iface);
        final Selector selector_ = Selector.open();
        channel.register(selector_, SelectionKey.OP_READ);

        final ByteBuffer rx_buf = ByteBuffer.allocate(BUF_SIZE);
        int data_points_idx = -5; // Skipping the first five data points

        // Loop reading from the UDP channel and printing estimated end-to-end latency
        // We're exiting this loop once we have
        while (data_points_idx < num_data_points) {
            if (selector_.selectNow() > 0) {
                while (channel.receive(rx_buf) != null) {
                    rx_buf.flip();
                    final long send_ts = rx_buf.getLong();
                    final long now_micros = ChronoUnit.MICROS.between(Instant.EPOCH, Instant.now());
                    final long latency = now_micros - send_ts;
                    System.out.println("<-- Latency " + latency + " micros");
                    rx_buf.clear();
                    ++data_points_idx;
                }

                // Need to do this otherwise the selector will always see the only channel I am dealing with as being
                // readable after having observed its first datagram
                selector_.selectedKeys().clear();
            }
        }
    }
}
4

0 回答 0