如何在Java中合并两个输入流?

在Java中有两个InputStream,有没有办法合并它们,所以你用一个InputStream结束,它给你两个流的输出? 怎么样?

如评论所说,合并时你的意思并不清楚。

从任一“随机”获取可用输入由InputStream.available复杂化。可能不一定为您提供有用的答案和阻止流的行为。 您需要两个线程从流中读取,然后通过java.io.Piped(In|Out)putStream传递数据(尽管这些类有问题)。 或者对于某些类型的流,可以使用不同的接口,例如java.nio非阻塞信道。

如果你想要第一个输入流的全部内容后跟第二个: new java.io.SequenceInputStream(s1, s2)

java.io.SequenceInputStream可能就是你所需要的。 它接受流的枚举,并将输出第一个流的内容,然后输出第二个流的内容,依此类推,直到所有流都为空。

您可以编写执行此操作的自定义InputStream实现。 例:

 import java.io.IOException; import java.io.InputStream; import java.util.Collections; import java.util.Deque; import java.util.LinkedList; public class CatInputStream extends InputStream { private final Deque streams; public CatInputStream(InputStream... streams) { this.streams = new LinkedList(); Collections.addAll(this.streams, streams); } private void nextStream() throws IOException { streams.removeFirst().close(); } @Override public int read() throws IOException { int result = -1; while (!streams.isEmpty() && (result = streams.getFirst().read()) == -1) { nextStream(); } return result; } @Override public int read(byte b[], int off, int len) throws IOException { int result = -1; while (!streams.isEmpty() && (result = streams.getFirst().read(b, off, len)) == -1) { nextStream(); } return result; } @Override public long skip(long n) throws IOException { long skipped = 0L; while (skipped < n && !streams.isEmpty()) { int thisSkip = streams.getFirst().skip(n - skipped); if (thisSkip > 0) skipped += thisSkip; else nextStream(); } return skipped; } @Override public int available() throws IOException { return streams.isEmpty() ? 0 : streams.getFirst().available(); } @Override public void close() throws IOException { while (!streams.isEmpty()) nextStream(); } } 

此代码未经过测试,因此您的里程可能会有所不同。

不是我能想到的。 您可能必须将两个流的内容读入byte [],然后从中创建ByteArrayInputStream。

这是一个特定于字节数组的MVar实现(确保添加自己的包定义)。 从这里开始,在合并流上编写输入流是微不足道的。 如果要求我也可以发布。

 import java.nio.ByteBuffer; public final class MVar { private static enum State { EMPTY, ONE, MANY } private final Object lock; private State state; private byte b; private ByteBuffer bytes; private int length; public MVar() { lock = new Object(); state = State.EMPTY; } public final void put(byte b) { synchronized (lock) { while (state != State.EMPTY) { try { lock.wait(); } catch (InterruptedException e) {} } this.b = b; state = State.ONE; lock.notifyAll(); } } public final void put(byte[] bytes, int offset, int length) { if (length == 0) { return; } synchronized (lock) { while (state != State.EMPTY) { try { lock.wait(); } catch (InterruptedException e) {} } this.bytes = ByteBuffer.allocateDirect(length); this.bytes.put(bytes, offset, length); this.bytes.position(0); this.length = length; state = State.MANY; lock.notifyAll(); } } public final byte take() { synchronized (lock) { while (state == State.EMPTY) { try { lock.wait(); } catch (InterruptedException e) {} } switch (state) { case ONE: { state = State.EMPTY; byte b = this.b; lock.notifyAll(); return b; } case MANY: { byte b = bytes.get(); state = --length <= 0 ? State.EMPTY : State.MANY; lock.notifyAll(); return b; } default: throw new AssertionError(); } } } public final int take(byte[] bytes, int offset, int length) { if (length == 0) { return 0; } synchronized (lock) { while (state == State.EMPTY) { try { lock.wait(); } catch (InterruptedException e) {} } switch (state) { case ONE: bytes[offset] = b; state = State.EMPTY; lock.notifyAll(); return 1; case MANY: if (this.length > length) { this.bytes.get(bytes, offset, length); this.length = this.length - length; synchronized (lock) { lock.notifyAll(); } return length; } this.bytes.get(bytes, offset, this.length); this.bytes = null; state = State.EMPTY; length = this.length; lock.notifyAll(); return length; default: throw new AssertionError(); } } } }