Java中的InputStream的多个读者

我有一个InputStream,我正在读取字符。 我希望多个读者访问此InputStream。 似乎一种合理的方法是将传入的数据写入StringBuffer或StringBuilder,并让多个读者读取它。 不幸的是,不推荐使用StringBufferInputStream。 StringReader读取一个字符串,而不是一个不断更新的可变对象。 我有什么选择? 写我自己的?

输入流的工作方式如下:一旦你从中读取了一部分,它就会永远消失。 你不能回去重读它。 你能做的是这样的:

class InputStreamSplitter { InputStreamSplitter(InputStream toReadFrom) { this.reader = new InputStreamReader(toReadFrom); } void addListener(Listener l) { this.listeners.add(l); } void work() { String line = this.reader.readLine(); while(line != null) { for(Listener l : this.listeners) { l.processLine(line); } } } } interface Listener { processLine(String line); } 

让所有感兴趣的人都实现Listener并将它们添加到InputStreamSplitter

注意: 我的其他答案更为一般(在我看来更好)。

正如@ dimo414所述 ,下面的答案要求第一个读者始终领先于第二个读者。 如果确实如此,那么这个答案可能仍然是优选的,因为它建立在标准类之上。


要创建两个独立于同一源读取的读取器,您必须确保它们不使用来自同一流的数据。

这可以通过组合来自Apache Commons的TeeInputStreamPipedInputStream以及PipedOutputStream ,如下所示:

 import java.io.*; import org.apache.commons.io.input.TeeInputStream; class Test { public static void main(String[] args) throws IOException { // Create the source input stream. InputStream is = new FileInputStream("filename.txt"); // Create a piped input stream for one of the readers. PipedInputStream in = new PipedInputStream(); // Create a tee-splitter for the other reader. TeeInputStream tee = new TeeInputStream(is, new PipedOutputStream(in)); // Create the two buffered readers. BufferedReader br1 = new BufferedReader(new InputStreamReader(tee)); BufferedReader br2 = new BufferedReader(new InputStreamReader(in)); // Do some interleaved reads from them. System.out.println("One line from br1:"); System.out.println(br1.readLine()); System.out.println(); System.out.println("Two lines from br2:"); System.out.println(br2.readLine()); System.out.println(br2.readLine()); System.out.println(); System.out.println("One line from br1:"); System.out.println(br1.readLine()); System.out.println(); } } 

输出:

 One line from br1: Line1: Lorem ipsum dolor sit amet, <-- reading from start Two lines from br2: Line1: Lorem ipsum dolor sit amet, <-- reading from start Line2: consectetur adipisicing elit, One line from br1: Line2: consectetur adipisicing elit, <-- resumes on line 2 

正如您可能已经注意到的,一旦您从输入流中读取了一个字节,它就会永远消失(除非您自己将其保存在某处)。

下面的解决方案确实保存字节,直到所有订阅输入流都读取它。

它的工作原理如下:

 // Create a SplittableInputStream from the originalStream SplittableInputStream is = new SplittableInputStream(originalStream); // Fork this to get more input streams reading independently from originalStream SplittableInputStream is2 = is.split(); SplittableInputStream is3 = is.split(); 

每次都是split() ,它将产生一个新的InputStream ,它将从被拆分的点读取字节。

SplittableInputStream如下所示(copy’n’paste away!):

 class SplittableInputStream extends InputStream { // Almost an input stream: The read-method takes an id. static class MultiplexedSource { static int MIN_BUF = 4096; // Underlying source private InputStream source; // Read positions of each SplittableInputStream private List readPositions = new ArrayList<>(); // Data to be read by the SplittableInputStreams int[] buffer = new int[MIN_BUF]; // Last valid position in buffer int writePosition = 0; public MultiplexedSource(InputStream source) { this.source = source; } // Add a multiplexed reader. Return new reader id. int addSource(int splitId) { readPositions.add(splitId == -1 ? 0 : readPositions.get(splitId)); return readPositions.size() - 1; } // Make room for more data (and drop data that has been read by // all readers) private void readjustBuffer() { int from = Collections.min(readPositions); int to = Collections.max(readPositions); int newLength = Math.max((to - from) * 2, MIN_BUF); int[] newBuf = new int[newLength]; System.arraycopy(buffer, from, newBuf, 0, to - from); for (int i = 0; i < readPositions.size(); i++) readPositions.set(i, readPositions.get(i) - from); writePosition -= from; buffer = newBuf; } // Read and advance position for given reader public int read(int readerId) throws IOException { // Enough data in buffer? if (readPositions.get(readerId) >= writePosition) { readjustBuffer(); buffer[writePosition++] = source.read(); } int pos = readPositions.get(readerId); int b = buffer[pos]; if (b != -1) readPositions.set(readerId, pos + 1); return b; } } // Non-root fields MultiplexedSource multiSource; int myId; // Public constructor: Used for first SplittableInputStream public SplittableInputStream(InputStream source) { multiSource = new MultiplexedSource(source); myId = multiSource.addSource(-1); } // Private constructor: Used in split() private SplittableInputStream(MultiplexedSource multiSource, int splitId) { this.multiSource = multiSource; myId = multiSource.addSource(splitId); } // Returns a new InputStream that will read bytes from this position // onwards. public SplittableInputStream split() { return new SplittableInputStream(multiSource, myId); } @Override public int read() throws IOException { return multiSource.read(myId); } } 

最后,一个演示:

 String str = "Lorem ipsum\ndolor sit\namet\n"; InputStream is = new ByteArrayInputStream(str.getBytes("UTF-8")); // Create the two buffered readers. SplittableInputStream is1 = new SplittableInputStream(is); SplittableInputStream is2 = is1.split(); BufferedReader br1 = new BufferedReader(new InputStreamReader(is1)); BufferedReader br2 = new BufferedReader(new InputStreamReader(is2)); // Do some interleaved reads from them. System.out.println("One line from br1:"); System.out.println(br1.readLine()); System.out.println(); System.out.println("Two lines from br2:"); System.out.println(br2.readLine()); System.out.println(br2.readLine()); System.out.println(); System.out.println("One line from br1:"); System.out.println(br1.readLine()); System.out.println(); 

输出:

 One line from br1: Lorem ipsum Two lines from br2: Lorem ipsum dolor sit One line from br1: dolor sit 

使用TeeInputStream将从InputStream读取的所有字节复制到辅助OutputStream ,例如ByteArrayOutputStream

而不是使用StringWriter / StringBufferInputStream,将原始InputStream写入ByteArrayOutputStream。 从原始InputStream读取完毕后,将ByteArrayOutputStream.toByteArray返回的字节数组传递给ByteArrayInputStream。 使用此InputStream作为首选的InputStream,以传递需要从中读取的其他内容。

本质上,你在这里所做的就是将原始InputStream的内容存储到内存中的byte []缓存中,就像你最初使用StringWriter / StringBufferInputStream一样。

这是另一种独立读取两个流的方法,不假设一个先于另一个流,而是使用标准类。 但是,它会在后台急切地从底层输入流中读取,这可能是不合需要的,具体取决于您的应用程序。

 public static void main(String[] args) throws IOException { // Create the source input stream. InputStream is = new ByteArrayInputStream("line1\nline2\nline3".getBytes()); // Create a piped input stream for each reader; PipedInputStream in1 = new PipedInputStream(); PipedInputStream in2 = new PipedInputStream(); // Start copying the input stream to both piped input streams. startCopy(is, new TeeOutputStream( new PipedOutputStream(in1), new PipedOutputStream(in2))); // Create the two buffered readers. BufferedReader br1 = new BufferedReader(new InputStreamReader(in1)); BufferedReader br2 = new BufferedReader(new InputStreamReader(in2)); // Do some interleaved reads from them. // ... } private static void startCopy(InputStream in, OutputStream out) { (new Thread() { public void run() { try { IOUtils.copy(in, out); } catch (IOException e) { throw new RuntimeException(e); } } }).start(); } 

寻找一种可能的方法让输出流将字节发送到两个或更多不同的Inputstream,我找到了这个论坛。 不幸的是,确切的解决方案是指向PipedOutputStream和PipedInputStream。 所以,我拒绝写一个PipeOutputStream扩展。 这里是。 该示例使用PipedOutputStream的“main”方法编写。

 /** * Extensao de {@link PipedOutputStream}, onde eh possivel conectar mais de um {@link PipedInputStream} */ public class PipedOutputStreamEx extends PipedOutputStream { /** * */ public PipedOutputStreamEx() { // TODO Auto-generated constructor stub } /* REMIND: identification of the read and write sides needs to be more sophisticated. Either using thread groups (but what about pipes within a thread?) or using finalization (but it may be a long time until the next GC). */ private PipedInputStreamEx[] sinks=null; public synchronized void connect(PipedInputStreamEx... pIns) throws IOException { for (PipedInputStreamEx snk : pIns) { if (snk == null) { throw new NullPointerException(); } else if (sinks != null || snk.connected) { throw new IOException("Already connected"); } snk.in = -1; snk.out = 0; snk.connected = true; } this.sinks = pIns; } /** * Writes the specified byte to the piped output stream. * 

* Implements the write method of OutputStream. * * @param b the byte to be written. * @exception IOException if the pipe is broken, * {@link #connect(java.io.PipedInputStream) unconnected}, * closed, or if an I/O error occurs. */ public void write(int b) throws IOException { if (this.sinks == null) { throw new IOException("Pipe(s) not connected"); } for (PipedInputStreamEx sink : this.sinks) { sink.receive(b); } } /** * Writes len bytes from the specified byte array * starting at offset off to this piped output stream. * This method blocks until all the bytes are written to the output * stream. * * @param b the data. * @param off the start offset in the data. * @param len the number of bytes to write. * @exception IOException if the pipe is broken, * {@link #connect(java.io.PipedInputStream) unconnected}, * closed, or if an I/O error occurs. */ public void write(byte b[], int off, int len) throws IOException { if (sinks == null) { throw new IOException("Pipe not connected"); } else if (b == null) { throw new NullPointerException(); } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return; } for (PipedInputStreamEx sink : this.sinks) { sink.receive(b, off, len); } } /** * Flushes this output stream and forces any buffered output bytes * to be written out. * This will notify any readers that bytes are waiting in the pipe. * * @exception IOException if an I/O error occurs. */ public synchronized void flush() throws IOException { if (sinks != null) { for (PipedInputStreamEx sink : this.sinks) { synchronized (sink) { sink.notifyAll(); } } } } /** * Closes this piped output stream and releases any system resources * associated with this stream. This stream may no longer be used for * writing bytes. * * @exception IOException if an I/O error occurs. */ public void close() throws IOException { if (sinks != null) { for (PipedInputStreamEx sink : this.sinks) { sink.receivedLast(); } } } /** * Teste desta extensao de {@link PipedOutputStream} * @param args * @throws InterruptedException * @throws IOException */ public static void main(String[] args) throws InterruptedException, IOException { final PipedOutputStreamEx pOut = new PipedOutputStreamEx(); final PipedInputStreamEx pInHash = new PipedInputStreamEx(); final PipedInputStreamEx pInConsole = new PipedInputStreamEx(); pOut.connect(pInHash, pInConsole); Thread escreve = new Thread("Escrevendo") { @Override public void run() { String[] paraGravar = new String[]{ "linha1 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha2 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha3 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha4 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha5 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha6 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha7 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha8 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha9 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha10 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha11 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha12 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha13 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha14 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha15 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha16 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha17 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha18 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha19 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" , "linha20 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n" }; for (String s :paraGravar) { try { pOut.write(s.getBytes("ISO-8859-1") ); Thread.sleep(100); } catch (Exception e) { throw new RuntimeException(e); } } try { pOut.close(); } catch (IOException e) { e.printStackTrace(); } } }; Thread le1 = new Thread("Le1 - hash") { @Override public void run() { try { System.out.println("HASH: "+HashUtil.getHashCRC(pInHash,true)); } catch (Exception e) { e.printStackTrace(); } } }; Thread le2 = new Thread("Le2 - escreve no console") { @Override public void run() { BufferedReader bIn = new BufferedReader(new InputStreamReader(pInConsole)); String s; try { while ( (s=bIn.readLine())!=null) { Thread.sleep(700); //teste simulando o um leitor lento... System.out.println(s); } } catch (IOException e) { throw new RuntimeException(e); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; escreve.start(); le1.start(); le2.start(); escreve.join(); le1.join(); le2.join(); pInHash.close(); pInConsole.close(); } }

这是PipedInputStreamEx代码。 不幸的是,我必须复制所有JDK代码,才能访问“已连接”,“进入”和“外出”属性。

 /** * Extensao de {@link PipedInputStream}, que permite conetar mais de um destes no {@link PipedOutputStream} * Como a classe ancestral possui propriedades 'package friend', tivemos que copiar o codigo herdado :/ */ public class PipedInputStreamEx extends PipedInputStream { @Override public void connect(PipedOutputStream src) throws IOException { throw new IOException("conecte usando PipedOutputStream.connect()"); } //---------------------------------------------------------------------------------------------------------- //---------------------------------------------------------------------------------------------------------- //---------------------------------------------------------------------------------------------------------- //--------- INICIO codigo da classe herdada (alguns metodos comentados...)---------------------------------- //---------------------------------------------------------------------------------------------------------- boolean closedByWriter = false; volatile boolean closedByReader = false; boolean connected = false; /* REMIND: identification of the read and write sides needs to be more sophisticated. Either using thread groups (but what about pipes within a thread?) or using finalization (but it may be a long time until the next GC). */ Thread readSide; Thread writeSide; private static final int DEFAULT_PIPE_SIZE = 1024; /** * The default size of the pipe's circular input buffer. * @since JDK1.1 */ // This used to be a constant before the pipe size was allowed // to change. This field will continue to be maintained // for backward compatibility. protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE; /** * The circular buffer into which incoming data is placed. * @since JDK1.1 */ protected byte buffer[]; /** * The index of the position in the circular buffer at which the * next byte of data will be stored when received from the connected * piped output stream. in<0 implies the buffer is empty, * in==out implies the buffer is full * @since JDK1.1 */ protected int in = -1; /** * The index of the position in the circular buffer at which the next * byte of data will be read by this piped input stream. * @since JDK1.1 */ protected int out = 0; // /** // * Creates a PipedInputStream so // * that it is connected to the piped output // * stream src. Data bytes written // * to src will then be available // * as input from this stream. // * // * @param src the stream to connect to. // * @exception IOException if an I/O error occurs. // */ // public PipedInputStream(PipedOutputStream src) throws IOException { // this(src, DEFAULT_PIPE_SIZE); // } // // /** // * Creates a PipedInputStream so that it is // * connected to the piped output stream // * src and uses the specified pipe size for // * the pipe's buffer. // * Data bytes written to src will then // * be available as input from this stream. // * // * @param src the stream to connect to. // * @param pipeSize the size of the pipe's buffer. // * @exception IOException if an I/O error occurs. // * @exception IllegalArgumentException if pipeSize <= 0. // * @since 1.6 // */ // public PipedInputStream(PipedOutputStream src, int pipeSize) // throws IOException { // initPipe(pipeSize); // connect(src); // } /** * Creates a PipedInputStream so * that it is not yet {@linkplain #connect(java.io.PipedOutputStream) * connected}. * It must be {@linkplain java.io.PipedOutputStream#connect( * java.io.PipedInputStream) connected} to a * PipedOutputStream before being used. */ public PipedInputStreamEx() { initPipe(DEFAULT_PIPE_SIZE); } /** * Creates a PipedInputStream so that it is not yet * {@linkplain #connect(java.io.PipedOutputStream) connected} and * uses the specified pipe size for the pipe's buffer. * It must be {@linkplain java.io.PipedOutputStream#connect( * java.io.PipedInputStream) * connected} to a PipedOutputStream before being used. * * @param pipeSize the size of the pipe's buffer. * @exception IllegalArgumentException if pipeSize <= 0. * @since 1.6 */ public PipedInputStreamEx(int pipeSize) { initPipe(pipeSize); } private void initPipe(int pipeSize) { if (pipeSize <= 0) { throw new IllegalArgumentException("Pipe Size <= 0"); } buffer = new byte[pipeSize]; } // /** // * Causes this piped input stream to be connected // * to the piped output stream src. // * If this object is already connected to some // * other piped output stream, an IOException // * is thrown. // * 

// * If src is an // * unconnected piped output stream and snk // * is an unconnected piped input stream, they // * may be connected by either the call: // *

// *

snk.connect(src) 

// *

// * or the call: // *

// *

src.connect(snk) 

// *

// * The two // * calls have the same effect. // * // * @param src The piped output stream to connect to. // * @exception IOException if an I/O error occurs. // */ // public void connect(PipedOutputStream src) throws IOException { // src.connect(this); // } /** * Receives a byte of data. This method will block if no input is * available. * @param b the byte being received * @exception IOException If the pipe is broken, * {@link #connect(java.io.PipedOutputStream) unconnected}, * closed, or if an I/O error occurs. * @since JDK1.1 */ protected synchronized void receive(int b) throws IOException { checkStateForReceive(); writeSide = Thread.currentThread(); if (in == out) awaitSpace(); if (in < 0) { in = 0; out = 0; } buffer[in++] = (byte)(b & 0xFF); if (in >= buffer.length) { in = 0; } } /** * Receives data into an array of bytes. This method will * block until some input is available. * @param b the buffer into which the data is received * @param off the start offset of the data * @param len the maximum number of bytes received * @exception IOException If the pipe is broken, * {@link #connect(java.io.PipedOutputStream) unconnected}, * closed,or if an I/O error occurs. */ synchronized void receive(byte b[], int off, int len) throws IOException { checkStateForReceive(); writeSide = Thread.currentThread(); int bytesToTransfer = len; while (bytesToTransfer > 0) { if (in == out) awaitSpace(); int nextTransferAmount = 0; if (out < in) { nextTransferAmount = buffer.length - in; } else if (in < out) { if (in == -1) { in = out = 0; nextTransferAmount = buffer.length - in; } else { nextTransferAmount = out - in; } } if (nextTransferAmount > bytesToTransfer) nextTransferAmount = bytesToTransfer; assert(nextTransferAmount > 0); System.arraycopy(b, off, buffer, in, nextTransferAmount); bytesToTransfer -= nextTransferAmount; off += nextTransferAmount; in += nextTransferAmount; if (in >= buffer.length) { in = 0; } } } private void checkStateForReceive() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByWriter || closedByReader) { throw new IOException("Pipe closed"); } else if (readSide != null && !readSide.isAlive()) { throw new IOException("Read end dead"); } } private void awaitSpace() throws IOException { while (in == out) { checkStateForReceive(); /* full: kick any waiting readers */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } } /** * Notifies all waiting threads that the last byte of data has been * received. */ synchronized void receivedLast() { closedByWriter = true; notifyAll(); } /** * Reads the next byte of data from this piped input stream. The * value byte is returned as an int in the range * 0 to 255. * This method blocks until input data is available, the end of the * stream is detected, or an exception is thrown. * * @return the next byte of data, or -1 if the end of the * stream is reached. * @exception IOException if the pipe is * {@link #connect(java.io.PipedOutputStream) unconnected}, * broken, closed, * or if an I/O error occurs. */ public synchronized int read() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } readSide = Thread.currentThread(); int trials = 2; while (in < 0) { if (closedByWriter) { /* closed by writer, return EOF */ return -1; } if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { throw new IOException("Pipe broken"); } /* might be a writer waiting */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } int ret = buffer[out++] & 0xFF; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } return ret; } /** * Reads up to len bytes of data from this piped input * stream into an array of bytes. Less than len bytes * will be read if the end of the data stream is reached or if * len exceeds the pipe's buffer size. * If len is zero, then no bytes are read and 0 is returned; * otherwise, the method blocks until at least 1 byte of input is * available, end of the stream has been detected, or an exception is * thrown. * * @param b the buffer into which the data is read. * @param off the start offset in the destination array b * @param len the maximum number of bytes read. * @return the total number of bytes read into the buffer, or * -1 if there is no more data because the end of * the stream has been reached. * @exception NullPointerException If b is null. * @exception IndexOutOfBoundsException If off is negative, * len is negative, or len is greater than * b.length - off * @exception IOException if the pipe is broken, * {@link #connect(java.io.PipedOutputStream) unconnected}, * closed, or if an I/O error occurs. */ public synchronized int read(byte b[], int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); } else if (off < 0 || len < 0 || len > b.length - off) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } /* possibly wait on the first character */ int c = read(); if (c < 0) { return -1; } b[off] = (byte) c; int rlen = 1; while ((in >= 0) && (len > 1)) { int available; if (in > out) { available = Math.min((buffer.length - out), (in - out)); } else { available = buffer.length - out; } // A byte is read beforehand outside the loop if (available > (len - 1)) { available = len - 1; } System.arraycopy(buffer, out, b, off + rlen, available); out += available; rlen += available; len -= available; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } } return rlen; } /** * Returns the number of bytes that can be read from this input * stream without blocking. * * @return the number of bytes that can be read from this input stream * without blocking, or {@code 0} if this input stream has been * closed by invoking its {@link #close()} method, or if the pipe * is {@link #connect(java.io.PipedOutputStream) unconnected}, or * broken. * * @exception IOException if an I/O error occurs. * @since JDK1.0.2 */ public synchronized int available() throws IOException { if(in < 0) return 0; else if(in == out) return buffer.length; else if (in > out) return in - out; else return in + buffer.length - out; } /** * Closes this piped input stream and releases any system resources * associated with the stream. * * @exception IOException if an I/O error occurs. */ public void close() throws IOException { closedByReader = true; synchronized (this) { in = -1; } } //---------------------------------------------------------------------------------------------------------- //--------- FIM codigo da classe herdada ------------------------------------------------------------------- //---------------------------------------------------------------------------------------------------------- }