线程中断没有结束阻塞调用输入流读取

我正在使用RXTX从串口读取数据。 读取是在以下列方式生成的线程内完成的:

CommPortIdentifier portIdentifier = CommPortIdentifier.getPortIdentifier(port); CommPort comm = portIdentifier.open("Whatever", 2000); SerialPort serial = (SerialPort)comm; ...settings Thread t = new Thread(new SerialReader(serial.getInputStream())); t.start(); 

SerialReader类实现Runnable并且无限循环,从端口读取并将数据构建到有用的包中,然后再将其发送到其他应用程序。 但是,我把它简化为以下简单:

 public void run() { ReadableByteChannel byteChan = Channels.newChannel(in); //in = InputStream passed to SerialReader ByteBuffer buffer = ByteBuffer.allocate(100); while (true) { try { byteChan.read(buffer); } catch (Exception e) { System.out.println(e); } } } 

当用户单击停止按钮时,将触发以下function,理论上应关闭输入流并中断阻塞byteChan.read(缓冲区)调用。 代码如下:

 public void stop() { t.interrupt(); serial.close(); } 

但是,当我运行此代码时,我从未得到ClosedByInterruptException,一旦输入流关闭,它应该触发。 此外,执行会在调用serial.close()时阻塞 – 因为底层输入流在读取调用时仍然阻塞。 我已经尝试用byteChan.close()替换中断调用,然后应该导致AsynchronousCloseException,但是,我得到了相同的结果。

对我所缺少的任何帮助将不胜感激。

您不能简单地通过包装它来将不支持可中断I / O的流创建到InterruptibleChannel (并且,无论如何, ReadableByteChannel不会扩展InterruptibleChannel )。

您必须查看基础InputStream的合约。 SerialPort.getInputStream()对其结果的可中断性有何评价? 如果它没有说什么,你应该假设它忽略了中断。

对于任何未明确支持可中断性的I / O,唯一的选择通常是从另一个线程关闭流。 这可能会立即引发在调用流时阻塞的线程中的IOException (尽管它可能不是AsynchronousCloseException )。

但是,即使这非常依赖于InputStream的实现 – 底层操作系统也是一个因素。


请注意newChannel()返回的ReadableByteChannelImpl类的源代码注释:

  private static class ReadableByteChannelImpl extends AbstractInterruptibleChannel // Not really interruptible implements ReadableByteChannel { InputStream in; ⋮ 

RXTX SerialInputStream(由serial.getInputStream()调用返回的内容)支持超时方案,最终解决了我的所有问题。 在创建新的SerialReader对象之前添加以下内容会导致读取不再无限期地阻塞:

 serial.enableReceiveTimeout(1000); 

在SerialReader对象中,我不得不改变一些东西直接从InputStream读取而不是创建ReadableByteChannel,但是现在,我可以毫无问题地停止并重新启动阅读器。

我正在使用下面的代码关闭rxtx。 我运行测试启动它们并关闭它们似乎工作正常。 我的读者看起来像:

 private void addPartsToQueue(final InputStream inputStream) { byte[] buffer = new byte[1024]; int len = -1; boolean first = true; // the read can throw try { while ((len = inputStream.read(buffer)) > -1) { if (len > 0) { if (first) { first = false; t0 = System.currentTimeMillis(); } else t1 = System.currentTimeMillis(); final String part = new String(new String(buffer, 0, len)); queue.add(part); //System.out.println(part + " " + (t1 - t0)); } try { Thread.sleep(sleep); } catch (InterruptedException e) { //System.out.println(Thread.currentThread().getName() + " interrupted " + e); break; } } } catch (IOException e) { System.err.println(Thread.currentThread().getName() + " " + e); //if(interruSystem.err.println(e); e.printStackTrace(); } //System.out.println(Thread.currentThread().getName() + " is ending."); } 

谢谢

 public void shutdown(final Device device) { shutdown(serialReaderThread); shutdown(messageAssemblerThread); serialPort.close(); if (device != null) device.setSerialPort(null); } public static void shutdown(final Thread thread) { if (thread != null) { //System.out.println("before intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState()); thread.interrupt(); //System.out.println("after intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState()); try { Thread.sleep(100); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " was interrupted trying to sleep after interrupting" + thread.getName() + " " + e); } //System.out.println("before join() on thread " + thread.getName() + ", it's state is " + thread.getState()); try { thread.join(); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " join interruped"); } //System.out.println(Thread.currentThread().getName() + " after join() on thread " + thread.getName() + ", it's state is" + thread.getState()); }