当我无缘无故地写入水槽时,NIO Pipe抛出“Broken Pipe”! 怎么调试?

我相信我已经做好了一切。 我创建一个管道,将接收器传递给写入器线程,使用OP_READ在我的选择器上注册源,启动我的选择器。 一切正常,但一旦我向水槽写东西,我就会得到一个破损的管道exception。 为什么!!! ??? 这里没有破裂的管道。 我烦了。 我如何调试/了解这里发生的事情? 有没有人有一个简单的管道示例,我可以运行来测试这是否有效。 在接收器上写入的线程和读取它的选择器。

编辑:我几乎遵循这里的建议。 在互联网上很难找到NIO管道的具体例子。

import java.io.*; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; public class SystemOutPipe extends Thread { public static void main(String[] args) { try { SystemOutPipe sop = new SystemOutPipe(); sop.start(); System.out.println("This message should be redirected to System.err\nNow waiting 5 seconds ..."); Thread.sleep(5000L); sop.setStopped(true); sop.join(); } catch (Exception e) { e.printStackTrace(); } } private Selector selector; private Pipe pipe; private boolean stopped = false; public SystemOutPipe() throws IOException { super("SystemOutPipe"); pipe = Pipe.open(); System.setOut(new PrintStream(new PipeOutputStream(pipe))); selector = Selector.open(); pipe.source().configureBlocking(false); pipe.source().register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024)); } @Override public void run() { try { while (!isStopped()) { int n = selector.select(1L); if (n > 0) { Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (key.isReadable()) { new ReadHandler(key).run(); } } } } } catch (Exception e) { e.printStackTrace(); // writes to System.err ! } } public synchronized boolean isStopped() { return stopped; } public synchronized void setStopped(final boolean stopped) { this.stopped = stopped; } public class ReadHandler implements Runnable { private final SelectionKey key; public ReadHandler(final SelectionKey key) { this.key = key; } @Override public void run() { ByteBuffer bbuf = (ByteBuffer) key.attachment(); ReadableByteChannel channel = (ReadableByteChannel) key.channel(); try { int count = 0; do { bbuf.clear(); count = channel.read(bbuf); if (count > 0) System.err.write(bbuf.array(), 0, count); } while(count > 0); } catch (IOException e) { e.printStackTrace(); key.cancel(); } } } public class PipeOutputStream extends OutputStream { private final Pipe pipe; public PipeOutputStream(final Pipe pipe) { this.pipe = pipe; } @Override public void write(final int b) throws IOException { write(new byte[] { (byte) b }); } @Override public void write(final byte[] b) throws IOException { write(b, 0, b.length); } @Override public void write(final byte[] b, final int off, final int len) throws IOException { ByteBuffer bbuf = ByteBuffer.wrap(b, off, len); bbuf.position(len); bbuf.flip(); int count = 0; while (count < len) { int n = pipe.sink().write(bbuf); if (n == 0) { // let's wait a bit and not consume cpu try { Thread.sleep(1L); } catch (InterruptedException e) { throw new IOException(e); } } else count += n; } } } } 

例外:

 java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcher.write0(Native Method) at sun.nio.ch.FileDispatcher.write(FileDispatcher.java:39) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:72) at sun.nio.ch.IOUtil.write(IOUtil.java:43) at sun.nio.ch.SinkChannelImpl.write(SinkChannelImpl.java:149) at com.niostuff.util.GCLogInterceptor.fileModified(GCLogInterceptor.java:180) at net.contentobjects.jnotify.linux.JNotifyAdapterLinux$WatchData.notifyFileModified(Unknown Source) at net.contentobjects.jnotify.linux.JNotifyAdapterLinux.notifyChangeEvent(Unknown Source) at net.contentobjects.jnotify.linux.JNotifyAdapterLinux$1.notify(Unknown Source) at net.contentobjects.jnotify.linux.JNotify_linux.callbackProcessEvent(Unknown Source) at net.contentobjects.jnotify.linux.JNotify_linux.nativeNotifyLoop(Native Method) at net.contentobjects.jnotify.linux.JNotify_linux.access$000(Unknown Source) at net.contentobjects.jnotify.linux.JNotify_linux$1.run(Unknown Source) 

好的,所以我发现了问题。 首先要感谢所有想要帮助的人。 希望你能从我的错误中吸取教训。 事件链是:

1 – 我没有耗尽接收缓冲区(源通道读入的缓冲区),最终它已满。

2 – 现在它已满,pipeSourceChannel.read(readBuffer)返回0个字节。 有数据要读取,但无法读取完整的缓冲区。

3 – 这导致频道被关闭(我自己在bytesRead == 0上这样做)和BrokenPipe。

我在这里学到的一课:PIPES很棘手。 我认为非阻塞并发队列使用起来要简单得多,就像这里提到的那样: Java NIO Pipe vs BlockingQueue