如何设计可能使用skip的返回流

我创建了一个解析库,它接受提供的输入并返回记录流。 然后程序调用该库并处理结果。 就我而言,我的程序正在使用类似的东西

recordStream.forEach(r -> insertIntoDB(r)); 

可以提供给解析库的输入类型之一是平面文件,其可以具有标题行。 因此,可以将解析库配置为跳过标题行。 如果配置了标题行,则会向返回添加skip(n)元素,例如

 Files.lines(input)**.skip(1)**.parallel().map(r -> createRecord(r)); 

解析库返回生成的Stream。

但是,似乎skip,parallel和forEach不能很好地结合在一起最终程序员必须调用forEachOrdered,但将这个要求放在程序员身上是不好的设计,期望他们知道如果处理输入类型他们必须使用forEachOrdered带有标题行的文件。

如何在返回的流链的构造中自己强制执行有序需求,以将完整function的流返回给程序编写器,而不是具有隐藏限制的流? 答案是将流包装在另一个流中吗?

forEachOrdered是必要的,不是因为skip() ,而是因为你的Stream是并行的。 即使流是并行的,流也将跳过第一个元素,如文档中所示:

虽然skip()通常是顺序流管道上的廉价操作,但在有序并行流水线上它可能非常昂贵,特别是对于大的n值,因为skip(n)被约束为不仅跳过任何n个元素,而是前n个遇到订单中的元素。

有明确记载, forEach不一定尊重订单。 当您关心订单时不使用forEachOrdered只是滥用Stream API:

此操作的行为明确是不确定的。 对于并行流管道,此操作不保证遵守流的遭遇顺序,因为这样做会牺牲并行性的好处。

我不会从库中返回并行流。 我会返回一个顺序的(其中forEach会尊重命令),让调用者调用parallel()并假设后果如果它想要。

默认情况下使用并行流是一个坏主意 。

考虑相关场景

  • 使用skip设置流源
  • 客户端代码正在请求parallel()执行
  • 客户端代码链接一个无序的终端操作,如forEach
  • 代码运行在早于1.8u60的JRE上

我们有非常特殊的情况组合,所有这些都在特定库函数的控制之外,它将链接.map(r -> createRecord(r))操作。

我认为责任不在于此。 嗯,一般来说,应用程序代码不负责修复已被识别为JRE错误并在最新版本中修复的内容。

如果由于某种原因你认为有必要为旧的JRE提供解决方案,那么需要skip操作的流源才能做到这一点。

对于这个具体案例,并不是那么难。 您可以直接创建BufferedReader ,调用readLine()跳过第一行,然后返回lines()的结果,这允许处理所有剩余的行。 作为带有skip操作的并行Stream,这可能更有效。

更通用的解决方案是这样的“急切先跳”操作:

 public static  Stream skipFirstImmediately(Stream source) { Spliterator sp=source.spliterator(); sp.tryAdvance(skipped -> {}); return StreamSupport.stream(sp, source.isParallel()); } 

请注意,在使用此方法时,由于当前Stream实现的属性,如果需要并行执行,在调用此方法之前将源Stream转换为并行而不是将生成的Stream转换为并行是有益的。

这可以通过比较输出来validation

 skipFirstImmediately(IntStream.range(0, 10).parallel().boxed()) .peek(x -> System.out.println(Thread.currentThread())) .forEach(System.out::println); 

 skipFirstImmediately(IntStream.range(0, 10).boxed()).parallel() .peek(x -> System.out.println(Thread.currentThread())) .forEach(System.out::println); 

这在任何一种情况下都是正确的,但在后者中没有利用SMPfunction。