-1

我有一些代码从多播套接字读取数据,直到用户定义的结束时间。Thread.interrupt如果线程通过调用(或您可以提出的任何其他用户启动的操作)中断,我也想停止读取数据。我不知道如何在线程中断时获得通知。现有代码如下:

// These are the constants I am given
final int         mcastPort = ...;
final InetAddress mcastIP   = ...;
final InetAddress ifaceIP   = ...; // Null indicates all interfaces should be used
final Instant     endTime   = ...; // Time at which to stop processing

// Initialize a datagram
final byte[] buffer = new byte[1000];
final DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

// Process incoming datagram packets
try (final MulticastSocket socket = new MulticastSocket(port)) {
    socket.joinGroup(mcastIP);
    if (ifaceIP != null)
        socket.setInterface(ifaceIP);

    do {
        final Duration soTimeout = Duration.between(Instant.now(), endTime);
        socket.setSoTimeout(soTimeout);
        socket.receive(packet);

        // Process packet
        ...
    } while (true);
} catch (final SocketTimeoutException e) {
    // Normal condition... the recording time has ended
} catch (final ClosedByInterruptException e) {
    // Uh-oh... this never happens
} ...

我看到有一个DatagramSocket.getChannel返回 a 的方法DatagramChannel,所以我自然假设该类型用于读取/写入底层套接字。这个假设是不正确的,这意味着MulticastSocket没有实现InterruptibleChannel. 因此,MulticastSocket.receive从不抛出ClosedByInterruptException.

我在网上搜索了示例,但无法弄清楚如何修改上述代码以使用 aDatagramChannel而不是MulticastSocket. 我需要帮助的问题是:

  1. 如何在 DatagramChannel 上设置 SO_TIMEOUT 参数?
  2. 如何将a 转换InetAddressNetworkInterface对象?

以下是我对如何将实现从 转换MulticastSocketDatagramChannel以满足我的要求的最佳猜测:

// Initialize a buffer
final ByteBuffer buffer = ByteBuffer.allocate(1000);

try (final DatagramChannel mcastChannel = DatagramChannel.open()) {
    mcastChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
    mcastChannel.connect(new InetSocketAddress(port));
    mcastChannel.join(mcastIP);
    if (ifaceIP != null)
        // HELP: this option requires an InterfaceAddress object,
        //       but I only have access to an InetAddress object
        mcastChannel.setOption(StandardSocketOptions.IP_MULTICAST_IF, ifaceIP);

    do {
        final Duration soTimeout = Duration.between(Instant.now(), endTime);
        // HELP: SO_TIMEOUT is not a member of StandardSocketOptions
        mcastChannel.setOption(SO_TIMEOUT, ???);
        mcastChannel.receive(buffer);

        // Process packet
        ...
    } while (true);
} ...

这种方法甚至会奏效吗? DatagramChannel.receive没有SocketTimeoutException列为它能够抛出的异常之一。如果这可行,那么请让我知道我需要如何将第二个实现更改为与第一个实现等效,但能够ClosedByInterruptException在客户端调用Thread.interrupt. 如果没有,那么是否有人对我如何满足在预定义时间停止数据报接收的要求有任何其他想法,同时还提供一种通过用户交互停止执行的方法?

4

2 回答 2

1

如何在 DatagramChannel 上设置 SO_TIMEOUT 参数?

通过调用channel.socket().setSoTimeout().

如何将a 转换InetAddressNetworkInterface对象?

您枚举网络接口,直到找到具有所需地址的网络接口。

DatagramChannel.receive()没有列出SocketTimeoutException

它不必。它列出IOExceptionSocketTimeoutException扩展IOException

您的第二段代码应该调用bind(),而不是调用,connect()它应该在调用之前设置接口join(),而不是之后。除此之外,一旦您修复了网络接口问题,它应该可以按预期工作,并且ClosedByInterruptException如果中断它会抛出。

于 2017-03-16T01:48:11.890 回答
0

最终的工作代码是:

// Create a datagram packet used to read multicast data from the socket
final byte[] buffer = new byte[1000];
final DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

// Process incoming datagram packets
try (final DatagramChannel mcastChannel =
        DatagramChannel.open(StandardProtocolFamily.INET)) {

    // Set the appropriate parameters on the socket
    mcastChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
    mcastChannel.bind(new InetSocketAddress(port));
    if (ifaceIP == null) {
        // Call join on each NetworkInterface that supports IPv4 multicast
    } else {
        mcastChannel.join(mcastIP, NetworkInterface.getByInetAddress(ifaceIP));
    }

    final DatagramSocket socket = mcastChannel.socket();

    do {
        final Duration timeToEnd = Duration.between(Instant.now(), endTime);
        if (timeToEnd.compareTo(Duration.ZERO) < 0) break;

        socket.setSoTimeout((int)timeToEnd.toMillis());
        socket.receive(packet);

        // Process packet
        ...
    } while (true);
} catch (final SocketTimeoutException e) {
    // The end time has passed
} catch (final ClosedByInterruptException e) {
    // The user initiated the closure
} ...

注意事项:

  1. 调用socket.receive. 如果将其更改为mcastChannel.receive,您将永远无法获得SocketTimeoutException.
  2. 对 的调用bind发生在对 的调用之前join。您可以join在尽可能多的不同网络接口上调用任意多次。
于 2017-03-16T02:47:29.020 回答