我使用 java.nio 编写了一个非常简单的多播/UDP 发布者和侦听器,我正在通过执行以下操作来测试端到端延迟:
- 在发布方面,我发送了一个包含当前时间戳的数据报,精度为微秒。
- 在侦听方面,我只是解析该时间戳,然后获取当前时间戳并计算差异以获得端到端延迟。
请注意,我没有进行任何 CPU 固定,而是在 Mac OS 上运行 Java 11。我在环回接口上运行发布/订阅的 2 个进程,我得到的延迟范围为 100 到 400 微秒。我发现它非常高,我预计延迟会快一个数量级。我错过了什么吗?
num_points: 100, 50th: 167us, 70th: 183us, 99th: 369us, min: 109, max: 369us
[100 - 110 us] -
[110 - 120 us] ----
[120 - 130 us] ----
[130 - 140 us] --------------
[140 - 150 us] -----------
[150 - 160 us] ---------
[160 - 170 us] ------------
[170 - 180 us] ------------
[180 - 190 us] ----
[190 - 200 us] ----
[200 - 210 us] ------
[210 - 220 us] --
[220 - 230 us] ---
[230 - 240 us] -----
[240 - 250 us] -----
[250 - 260 us]
[260 - 270 us]
[270 - 280 us] -
[280 - 290 us]
[290 - 300 us]
[300 - Infinity us] ---
如果你想重现,我将代码添加到这篇文章中。要运行程序,参数如下:
// For the publisher
pub <iface_ip> <dst_group> <port> <millis_between_publication>
// For instance
pub 127.0.0.1 224.0.0.1 6000 50
// For the subscriber
sub <iface_ip> <dst_group> <port> <number_of_datapoints>
// For instance
sub 127.0.0.1 224.0.0.1 6000 100
这是代码:
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
public class UDPPubSubBench {
private static final int BUF_SIZE = 2048;
public static void main(String[] args) {
if (args.length != 5)
throw new RuntimeException("Not enough arguments");
final String mode = args[0];
final String iface_ip = args[1];
final String dst_group = args[2];
final int port = Integer.parseInt(args[3]);
final int val = Integer.parseInt(args[4]);
System.out.printf("iface: %s, group: %s, port: %d\n", iface_ip, dst_group, port);
try {
if (mode.equals("pub")) { // Publisher - multicasts data over UPD
runPublisher(iface_ip, dst_group, port, val);
} else if (mode.equals("sub")) { // Subscriber - joins UDP multicast group and listen for datagrams
runSubscriber(iface_ip, dst_group, port, val);
} else {
throw new RuntimeException("Invalid arguments!");
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
private static void runPublisher(final String iface_ip, final String group, final int port,
int cycle_millis) throws Exception {
if (cycle_millis < 0)
throw new RuntimeException("cycle_millis cannot be negative!");
// Setting up UDP channel to publish datagram on multicast group
final DatagramChannel channel_ = DatagramChannel.open(StandardProtocolFamily.INET);
channel_.bind(null)
.setOption(StandardSocketOptions.IP_MULTICAST_IF, NetworkInterface.getByInetAddress(InetAddress.getByName(iface_ip)))
.setOption(StandardSocketOptions.IP_MULTICAST_LOOP, true)
.configureBlocking(false);
final InetSocketAddress dst_group_ = new InetSocketAddress(group, port);
final ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
// Endless loop - publishing a datagram every 2 seconds
System.out.println("Running publisher...");
while (true) {
final long now_micros = ChronoUnit.MICROS.between(Instant.EPOCH, Instant.now());
buf.putLong(now_micros).flip();
if (channel_.send(buf, dst_group_) != 8) // Could not write the full long
throw new RuntimeException("Socket not ready to write!");
buf.clear();
System.out.println("--> " + now_micros);
Thread.sleep(cycle_millis);
}
}
private static void runSubscriber(final String iface_ip, final String group, final int port,
int num_data_points) throws Exception {
if (num_data_points < 0)
throw new RuntimeException("num_data_points cannot be negative!");
// Setting up channel to join multicast group
final NetworkInterface iface = NetworkInterface.getByInetAddress(InetAddress.getByName(iface_ip));
final DatagramChannel channel = DatagramChannel.open(StandardProtocolFamily.INET)
.setOption(StandardSocketOptions.SO_REUSEADDR, true)
.bind(new InetSocketAddress(port))
.setOption(StandardSocketOptions.IP_MULTICAST_IF, iface)
.setOption(StandardSocketOptions.IP_MULTICAST_LOOP, true);
channel.socket().setReceiveBufferSize(BUF_SIZE);
channel.configureBlocking(false);
// Joining multicast group
channel.join(InetAddress.getByName(group), iface);
final Selector selector_ = Selector.open();
channel.register(selector_, SelectionKey.OP_READ);
final ByteBuffer rx_buf = ByteBuffer.allocate(BUF_SIZE);
int data_points_idx = -5; // Skipping the first five data points
// Loop reading from the UDP channel and printing estimated end-to-end latency
// We're exiting this loop once we have
while (data_points_idx < num_data_points) {
if (selector_.selectNow() > 0) {
while (channel.receive(rx_buf) != null) {
rx_buf.flip();
final long send_ts = rx_buf.getLong();
final long now_micros = ChronoUnit.MICROS.between(Instant.EPOCH, Instant.now());
final long latency = now_micros - send_ts;
System.out.println("<-- Latency " + latency + " micros");
rx_buf.clear();
++data_points_idx;
}
// Need to do this otherwise the selector will always see the only channel I am dealing with as being
// readable after having observed its first datagram
selector_.selectedKeys().clear();
}
}
}
}