以并行方式将`BufferedReader`转换为`Stream `

有没有办法从BufferedReader reader接收Stream stream ,这样stream中的每个字符串代表一行reader ,其中附加条件是直接提供streamreader读取所有内容之前)? 我想处理stream并行的数据,以便从reader那里获取它们以节省时间。

编辑:我想处理与阅读并行的数据。 我不想并行处理不同的行。 它们应该按顺序处理。

让我们举例说明我希望如何节省时间。 假设我们的reader将向我们展示100行。 读取一行需要2 ms,处理1 ms需要1 ms。 如果我先读取所有行然后处理它们,将需要300毫秒。 我想要做的是:一旦读取一行,我想处理它并且并行读取下一行。 总时间将为201毫秒。

我不喜欢BufferedReader.lines() :据我所知,当我想处理这些行时,读取就开始了。 我们假设我已经有了我的reader但在能够处理第一行之前必须进行预计算。 假设它们花费30毫秒。 在上面的例子中,使用reader.lines()总时间为231毫秒或301毫秒(你能告诉我哪些时间是正确的吗?)。 但是有可能在201毫秒内完成工作,因为预计算可以与读取前15行并行完成。

您可以使用reader.lines().parallel() 。 这样,您的输入将被拆分为块,并且将在块上并行执行进一步的流操作。 如果进一步的操作需要很长时间,那么您可能会获得性能提升。

在你的情况下,默认启发式将无法正常工作,我想没有现成的解决方案,允许您使用单行批处理。 您可以编写一个自定义分裂器,它将在每行之后分割。 查看java.util.Spliterators.AbstractSpliterator实现。 可能最简单的解决方案是编写类似的东西,但是将批量大小限制为trySplit一个元素,并在tryAdvance方法中读取单行。

要做你想做的事,你通常会有一个线程读取行并将它们添加到阻塞队列,第二个线程从这个阻塞队列中获取行并处理它们。

你看错了地方。 您认为一行线将从文件中读取行,但这不是它的工作原理。 你无法告诉底层系统读取一条线,因为在阅读之前没有人知道一条线是什么。

BufferedReader因其字符缓冲区而具有它的名称。 此缓冲区的默认容量为8192.每当请求新行时,将解析缓冲区以获取换行序列,并返回该部分。 当缓冲区没有足够的字符来查找当前行时, 将填充整个缓冲区

现在,填充缓冲区可能会导致请求从底层InputStream读取字节以填充字符解码器的缓冲区。 将要请求的字节数和实际读取的字节数取决于字符解码器的缓冲区大小,实际编码映射到一个字符的字节数以及底层InputStream是否有自己的缓冲区以及它有多大。

实际的昂贵操作是从底层流中读取字节,并且没有从行读取请求到这些读取操作的简单映射。 请求第一行可能导致读取,假设来自底层文件的一个16 KiB块,随后的一百个请求可能从填充的缓冲区提供,并且根本不会导致I / O. 而您对Stream API所做的任何事情都无法改变。 你要并行化的唯一事情就是在缓冲区中搜索新的行字符,这对于并行执行来说太微不足道了。

您可以减少所有相关方的缓冲区大小,以便在处理前一行时粗略地获得一行的预期并行读取,但是,并行执行将永远不会补偿由小缓冲区大小导致的性能下降。