如何设计可能使用skip的返回流
我创建了一个解析库,它接受提供的输入并返回记录流。 然后程序调用该库并处理结果。 就我而言,我的程序正在使用类似的东西
recordStream.forEach(r -> insertIntoDB(r));
可以提供给解析库的输入类型之一是平面文件,其可以具有标题行。 因此,可以将解析库配置为跳过标题行。 如果配置了标题行,则会向返回添加skip(n)元素,例如
Files.lines(input)**.skip(1)**.parallel().map(r -> createRecord(r));
解析库返回生成的Stream。
但是,似乎skip,parallel和forEach不能很好地结合在一起最终程序员必须调用forEachOrdered,但将这个要求放在程序员身上是不好的设计,期望他们知道如果处理输入类型他们必须使用forEachOrdered带有标题行的文件。
如何在返回的流链的构造中自己强制执行有序需求,以将完整function的流返回给程序编写器,而不是具有隐藏限制的流? 答案是将流包装在另一个流中吗?
forEachOrdered
是必要的,不是因为skip()
,而是因为你的Stream是并行的。 即使流是并行的,流也将跳过第一个元素,如文档中所示:
虽然skip()通常是顺序流管道上的廉价操作,但在有序并行流水线上它可能非常昂贵,特别是对于大的n值,因为skip(n)被约束为不仅跳过任何n个元素,而是前n个遇到订单中的元素。
有明确记载, forEach
不一定尊重订单。 当您关心订单时不使用forEachOrdered
只是滥用Stream API:
此操作的行为明确是不确定的。 对于并行流管道,此操作不保证遵守流的遭遇顺序,因为这样做会牺牲并行性的好处。
我不会从库中返回并行流。 我会返回一个顺序的(其中forEach会尊重命令),让调用者调用parallel()
并假设后果如果它想要。
默认情况下使用并行流是一个坏主意 。
考虑相关场景
- 使用
skip
设置流源 - 客户端代码正在请求
parallel()
执行 - 客户端代码链接一个无序的终端操作,如
forEach
- 代码运行在早于
1.8u60
的JRE上
我们有非常特殊的情况组合,所有这些都在特定库函数的控制之外,它将链接.map(r -> createRecord(r))
操作。
我认为责任不在于此。 嗯,一般来说,应用程序代码不负责修复已被识别为JRE错误并在最新版本中修复的内容。
如果由于某种原因你认为有必要为旧的JRE提供解决方案,那么需要skip
操作的流源才能做到这一点。
对于这个具体案例,并不是那么难。 您可以直接创建BufferedReader
,调用readLine()
跳过第一行,然后返回lines()
的结果,这允许处理所有剩余的行。 作为带有skip
操作的并行Stream,这可能更有效。
更通用的解决方案是这样的“急切先跳”操作:
public static Stream skipFirstImmediately(Stream source) { Spliterator sp=source.spliterator(); sp.tryAdvance(skipped -> {}); return StreamSupport.stream(sp, source.isParallel()); }
请注意,在使用此方法时,由于当前Stream实现的属性,如果需要并行执行,在调用此方法之前将源Stream转换为并行而不是将生成的Stream转换为并行是有益的。
这可以通过比较输出来validation
skipFirstImmediately(IntStream.range(0, 10).parallel().boxed()) .peek(x -> System.out.println(Thread.currentThread())) .forEach(System.out::println);
和
skipFirstImmediately(IntStream.range(0, 10).boxed()).parallel() .peek(x -> System.out.println(Thread.currentThread())) .forEach(System.out::println);
这在任何一种情况下都是正确的,但在后者中没有利用SMPfunction。