10

我正在使用 RXTX 从串行端口读取数据。读取是在以下列方式产生的线程内完成的:

CommPortIdentifier portIdentifier = CommPortIdentifier.getPortIdentifier(port);
CommPort comm = portIdentifier.open("Whatever", 2000);
SerialPort serial = (SerialPort)comm;
...settings
Thread t = new Thread(new SerialReader(serial.getInputStream()));
t.start();

SerialReader 类实现 Runnable 并且只是无限循环,从端口读取数据并将数据构造成有用的包,然后再将其发送到其他应用程序。但是,我已将其简化为以下简单性:

public void run() {
  ReadableByteChannel byteChan = Channels.newChannel(in); //in = InputStream passed to SerialReader
  ByteBuffer buffer = ByteBuffer.allocate(100);
  while (true) {
    try {
      byteChan.read(buffer);
    } catch (Exception e) {
      System.out.println(e);
    }
  }
}

当用户单击停止按钮时,会触发以下功能,理论上应该关闭输入流并打破阻塞的 byteChan.read(buffer) 调用。代码如下:

public void stop() {
  t.interrupt();
  serial.close();
}

但是,当我运行此代码时,我永远不会收到 ClosedByInterruptException,一旦输入流关闭,它应该触发。此外,在调用 serial.close() 时执行会阻塞——因为底层输入流仍然阻塞在 read 调用上。我尝试用 byteChan.close() 替换中断调用,这应该会导致 AsynchronousCloseException,但是,我得到了相同的结果。

对我所缺少的任何帮助将不胜感激。

4

3 回答 3

9

您不能简单地将不支持可中断 I/O 的流InterruptibleChannel包装成一个流(而且,无论如何,ReadableByteChannel不​​扩展InterruptibleChannel)。

你必须看一下标的的合约InputStreamSerialPort.getInputStream()关于其结果的可中断性有什么说法?如果它什么也没说,你应该假设它忽略了中断。

对于任何不明确支持可中断性的 I/O,唯一的选择通常是从另一个线程关闭流。这可能会立即在调用流时阻塞的线程中引发一个IOException(尽管它可能不是)。AsynchronousCloseException

然而,即使这也非常依赖于InputStream- 底层操作系统的实现也可能是一个因素。


ReadableByteChannelImpl请注意对返回的类的源代码注释newChannel()

  private static class ReadableByteChannelImpl
    extends AbstractInterruptibleChannel       // Not really interruptible
    implements ReadableByteChannel
  {
    InputStream in;
    ⋮
于 2010-10-01T22:22:38.917 回答
5

RXTX SerialInputStream(由 serial.getInputStream() 调用返回的内容)支持超时方案,最终解决了我的所有问题。在创建新的 SerialReader 对象之前添加以下内容会导致读取不再无限期阻塞:

serial.enableReceiveTimeout(1000);

在 SerialReader 对象中,我不得不更改一些东西以直接从 InputStream 读取而不是创建 ReadableByteChannel,但现在,我可以停止并重新启动阅读器而不会出现问题。

于 2010-10-13T17:31:43.113 回答
1

我正在使用下面的代码来关闭 rxtx。我运行启动它们并关闭它们的测试,似乎工作正常。我的读者看起来像:

private void addPartsToQueue(final InputStream inputStream) {
    byte[] buffer = new byte[1024];
    int len = -1;
    boolean first = true;
    // the read can throw
    try {
        while ((len = inputStream.read(buffer)) > -1) {
            if (len > 0) {
                if (first) {
                    first = false;
                    t0 = System.currentTimeMillis();
                } else
                    t1 = System.currentTimeMillis();
                final String part = new String(new String(buffer, 0, len));
                queue.add(part);
                //System.out.println(part + " " + (t1 - t0));
            }
            try {
                Thread.sleep(sleep);
            } catch (InterruptedException e) {
                //System.out.println(Thread.currentThread().getName() + " interrupted " + e);
                break;
            }
        }
    } catch (IOException e) {
        System.err.println(Thread.currentThread().getName() + " " + e);
        //if(interruSystem.err.println(e);
        e.printStackTrace();
    }
    //System.out.println(Thread.currentThread().getName() + " is ending.");
}

谢谢

public void shutdown(final Device device) {
    shutdown(serialReaderThread);
    shutdown(messageAssemblerThread);
    serialPort.close();
    if (device != null)
        device.setSerialPort(null);
}

public static void shutdown(final Thread thread) {
    if (thread != null) {
        //System.out.println("before intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState());
        thread.interrupt();
        //System.out.println("after intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState());
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " was interrupted trying to sleep after interrupting" + thread.getName() + " " + e);
        }
        //System.out.println("before join() on thread " + thread.getName() + ", it's state is " + thread.getState());
        try {
            thread.join();
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " join interruped");
        }
        //System.out.println(Thread.currentThread().getName() + " after join() on thread " + thread.getName() + ", it's state is" + thread.getState());
    }
于 2010-11-30T08:50:24.480 回答