Java中的输入和输出流管道

有没有人有任何关于在Java中创建Pipe对象的好建议,它既是InputStream又是OutputStream,因为Java没有多重inheritance,而且这两个流都是抽象类而不是接口?

基本需求是有一个对象可以传递给需要InputStream或OutputStream的东西来管道从一个线程输出到另一个线程的输入。

似乎错过了这个问题的重点。 如果我理解正确,你需要一个在一个线程中像InputStream一样工作的对象,在另一个线程中想要一个OutputStream来创建一个在两个线程之间进行通信的方法。

也许一个答案是使用组合而不是inheritance(无论如何都是推荐的练习)。 使用getInputStream()和getOutputStream()方法创建一个包含彼此连接的PipedInputStream和PipedOutputStream的管道。

您不能直接将Pipe对象传递给需要流的东西,但是您可以传递它的get方法的返回值来执行它。

那对你有用吗?

java.io.PipedOutputStream和java.io.PipedInputStream看起来是用于此场景的类。 它们被设计为一起使用以在线程之间管道数据。

如果你真的想要一些单个对象传递它,则需要包含其中一个并通过getter公开它们。

我认为这是很常见的事情。 看到这个问题。

将Java InputStream的内容写入OutputStream的简便方法

你不能创建一个从InputStreamOutputStream派生的类,因为它们不是接口,它们有通用的方法,Java不允许多重inheritance(编译器不知道是否调用InputStream.close()OutputStream.close()如果你在新对象上调用close() )。

另一个问题是缓冲区。 Java希望为数据分配一个静态缓冲区(不会改变)。 这意味着当你使用`java.io.PipedXxxStream’时,除非你使用两个不同的线程,否则写入数据最终会被阻塞。

所以Apocalisp的答案是正确的:你必须编写一个复制循环。

我建议您在项目中包含Apache的commons-io,其中包含许多帮助例程,仅用于此类任务(在流,文件,字符串及其所有组合之间复制数据)。

见http://ostermiller.org/utils/CircularBuffer.html

我必须实现一个缓慢连接到Servlet的filter,所以基本上我将servlet输出流包装到QueueOutputStream中,它将每个字节(在小缓冲区中)添加到队列中,然后将这些小缓冲区输出到第二个输出流,所以在某种程度上,它充当输入/输出流,恕我直言这比JDK管道更好,它不能很好地扩展,基本上在标准JDK实现中有太多的上下文切换(每次读/写),阻塞队列只是适用于单一生产者/消费者场景:

 import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.*; public class QueueOutputStream extends OutputStream { private static final int DEFAULT_BUFFER_SIZE=1024; private static final byte[] END_SIGNAL=new byte[]{}; private final BlockingQueue queue=new LinkedBlockingDeque<>(); private final byte[] buffer; private boolean closed=false; private int count=0; public QueueOutputStream() { this(DEFAULT_BUFFER_SIZE); } public QueueOutputStream(final int bufferSize) { if(bufferSize<=0){ throw new IllegalArgumentException("Buffer size <= 0"); } this.buffer=new byte[bufferSize]; } private synchronized void flushBuffer() { if(count>0){ final byte[] copy=new byte[count]; System.arraycopy(buffer,0,copy,0,count); queue.offer(copy); count=0; } } @Override public synchronized void write(final int b) throws IOException { if(closed){ throw new IllegalStateException("Stream is closed"); } if(count>=buffer.length){ flushBuffer(); } buffer[count++]=(byte)b; } @Override public synchronized void write(final byte[] b, final int off, final int len) throws IOException { super.write(b,off,len); } @Override public synchronized void close() throws IOException { flushBuffer(); queue.offer(END_SIGNAL); closed=true; } public Future asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream) { return executor.submit( new Callable() { @Override public Void call() throws Exception { try{ byte[] buffer=queue.take(); while(buffer!=END_SIGNAL){ outputStream.write(buffer); buffer=queue.take(); } outputStream.flush(); } catch(Exception e){ close(); throw e; } finally{ outputStream.close(); } return null; } } ); }