线程中断没有结束阻塞调用输入流读取
我正在使用RXTX从串口读取数据。 读取是在以下列方式生成的线程内完成的:
CommPortIdentifier portIdentifier = CommPortIdentifier.getPortIdentifier(port); CommPort comm = portIdentifier.open("Whatever", 2000); SerialPort serial = (SerialPort)comm; ...settings Thread t = new Thread(new SerialReader(serial.getInputStream())); t.start();
SerialReader类实现Runnable并且无限循环,从端口读取并将数据构建到有用的包中,然后再将其发送到其他应用程序。 但是,我把它简化为以下简单:
public void run() { ReadableByteChannel byteChan = Channels.newChannel(in); //in = InputStream passed to SerialReader ByteBuffer buffer = ByteBuffer.allocate(100); while (true) { try { byteChan.read(buffer); } catch (Exception e) { System.out.println(e); } } }
当用户单击停止按钮时,将触发以下function,理论上应关闭输入流并中断阻塞byteChan.read(缓冲区)调用。 代码如下:
public void stop() { t.interrupt(); serial.close(); }
但是,当我运行此代码时,我从未得到ClosedByInterruptException,一旦输入流关闭,它应该触发。 此外,执行会在调用serial.close()时阻塞 – 因为底层输入流在读取调用时仍然阻塞。 我已经尝试用byteChan.close()替换中断调用,然后应该导致AsynchronousCloseException,但是,我得到了相同的结果。
对我所缺少的任何帮助将不胜感激。
您不能简单地通过包装它来将不支持可中断I / O的流创建到InterruptibleChannel
(并且,无论如何, ReadableByteChannel
不会扩展InterruptibleChannel
)。
您必须查看基础InputStream
的合约。 SerialPort.getInputStream()
对其结果的可中断性有何评价? 如果它没有说什么,你应该假设它忽略了中断。
对于任何未明确支持可中断性的I / O,唯一的选择通常是从另一个线程关闭流。 这可能会立即引发在调用流时阻塞的线程中的IOException
(尽管它可能不是AsynchronousCloseException
)。
但是,即使这非常依赖于InputStream
的实现 – 底层操作系统也是一个因素。
请注意newChannel()
返回的ReadableByteChannelImpl
类的源代码注释:
private static class ReadableByteChannelImpl extends AbstractInterruptibleChannel // Not really interruptible implements ReadableByteChannel { InputStream in; ⋮
RXTX SerialInputStream(由serial.getInputStream()调用返回的内容)支持超时方案,最终解决了我的所有问题。 在创建新的SerialReader对象之前添加以下内容会导致读取不再无限期地阻塞:
serial.enableReceiveTimeout(1000);
在SerialReader对象中,我不得不改变一些东西直接从InputStream读取而不是创建ReadableByteChannel,但是现在,我可以毫无问题地停止并重新启动阅读器。
我正在使用下面的代码关闭rxtx。 我运行测试启动它们并关闭它们似乎工作正常。 我的读者看起来像:
private void addPartsToQueue(final InputStream inputStream) { byte[] buffer = new byte[1024]; int len = -1; boolean first = true; // the read can throw try { while ((len = inputStream.read(buffer)) > -1) { if (len > 0) { if (first) { first = false; t0 = System.currentTimeMillis(); } else t1 = System.currentTimeMillis(); final String part = new String(new String(buffer, 0, len)); queue.add(part); //System.out.println(part + " " + (t1 - t0)); } try { Thread.sleep(sleep); } catch (InterruptedException e) { //System.out.println(Thread.currentThread().getName() + " interrupted " + e); break; } } } catch (IOException e) { System.err.println(Thread.currentThread().getName() + " " + e); //if(interruSystem.err.println(e); e.printStackTrace(); } //System.out.println(Thread.currentThread().getName() + " is ending."); }
谢谢
public void shutdown(final Device device) { shutdown(serialReaderThread); shutdown(messageAssemblerThread); serialPort.close(); if (device != null) device.setSerialPort(null); } public static void shutdown(final Thread thread) { if (thread != null) { //System.out.println("before intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState()); thread.interrupt(); //System.out.println("after intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState()); try { Thread.sleep(100); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " was interrupted trying to sleep after interrupting" + thread.getName() + " " + e); } //System.out.println("before join() on thread " + thread.getName() + ", it's state is " + thread.getState()); try { thread.join(); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " join interruped"); } //System.out.println(Thread.currentThread().getName() + " after join() on thread " + thread.getName() + ", it's state is" + thread.getState()); }