当Java 8 Stream抛出RuntimeException时,预期的行为是什么?

在流处理期间遇到RuntimeException时,流处理是否应该中止? 应该先完成吗? 是否应该在Stream.close()Stream.close()抛出exception? exception是否被重新抛出或被包裹? Stream的JavaDoc和java.util.stream包没什么好说的。

我发现的所有关于Stackoverflow的问题似乎都集中在如何从function界面中包装已检查的exception以便编译它们的代码。 事实上,互联网上的博文和类似文章都集中在同一个警告上。 这不是我关心的问题。

我根据自己的经验知道,一旦抛出RuntimeException顺序流的处理就会中止,并且这个exception会按原样重新抛出。 仅当客户端线程抛出exception时,对于并行流才是相同的。

但是, 此处的示例代码演示了如果在并行流处理期间由“工作线程”(=与调用终端操作的线程不同的线程)抛出exception,则此exception将永远丢失并且流处理完成。

示例代码将首先并行运行IntStream 。 然后是“正常” Stream并行。

这个例子将表明,

1)如果遇到RuntimeException, IntStream在中止并行处理方面没有问题。 exception被重新抛出,包装在另一个RuntimeException中。

2) Stream不好玩。 实际上,客户端线程永远不会看到抛出RuntimeException的痕迹。 流不仅完成处理; 将处理更多元素而不是指定的limit()

在示例代码中,使用IntStream.range()生成IntStream 。 “普通” Stream没有“范围”的概念,而是由1:s组成,但调用Stream.limit()将流限制为10亿个元素。

这是另一个转折点。 生成IntStream的示例代码执行如下操作:

 IntStream.range(0, 1_000_000_000).parallel().forEach(..) 

将其更改为生成的流,就像代码中的第二个示例一样:

 IntStream.generate(() -> 1).limit(1_000_000_000).parallel().forEach(..) 

此IntStream的结果是相同的:exception被包装并重新抛出,处理中止。 但是,第二个流现在也将包装并重新抛出exception,而不是处理超出限制的元素! 因此:更改第一个流的生成方式会对第二个流的行为产生副作用。 对我来说,这很奇怪。

ForkJoinPool.invoke()和ForkJoinTask JavaDoc表示exception被重新抛出,这是我对并行流的期望。

背景

当处理从Collection.stream().parallel()获取的并行流中的元素时,我遇到了这个“问题”(我没有validationCollection.parallelStream()的行为,但它应该是相同的)。 发生的事情是“工作线程”崩溃然后静静地离开,而所有其他线程成功完成了流。 我的应用程序使用默认的exception处理程序将exception写入日志文件。 但是甚至没有创建这个日志文件。 线程和他的例外根本就消失了。 由于我需要在捕获运行时exception时立即中止,因此一种替代方法是编写将此exception泄漏给其他工作程序的代码,使其在任何其他线程抛出exception时不愿意继续。 当然,这并不能保证流实现只是继续生成尝试完成流的新线程。 所以我可能最终不会使用并行流,而是使用线程池/执行器进行“正常”并发编程。

这表明运行时exception丢失的问题不会与Stream.generate()或使用Stream.limit()生成的流隔离。 最重要的是,我很想知道…是预期的行为?

关于exception报告,这两个流的行为没有区别,问题是您将两个测试一个接一个地放入一个方法中,并让它们访问共享数据结构。

有一个微妙的,可能没有充分记录(如果有意)的行为:当流操作exception完成时,它不会等待所有并发操作的完成。

因此,当您捕获第一个流操作的exception时,仍有一些线程正在运行并访问您的共享数据。 因此,当您重置AtomicBoolean ,属于第一个作业的其中一个线程将读取false值,将其变为true ,打印消息并抛出一个丢失的exception,因为流操作已经exception完成。 此外,这些线程中的一些将您重置提升您的计数器,这就是为什么它的数量高于第二个作业允许的数量。 您的第二个作业没有exception完成,因为属于第二个作业的所有线程都将从AtomicBoolean读取一个true值。

有一些方法可以发现这一点。

当您删除第一个流操作时,第二个将按预期exception完成。 另外,插入语句

 ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS); 

两个流操作之间将解决问题,因为它等待所有线程的完成。

但是,更干净的解决方案是让两个流操作都使用自己的计数器和标志。

也就是说,如果你只是交换这两个操作,那么存在一个微妙的,依赖于实现的差异导致问题消失。 IntStream.range操作生成具有已知大小的流,这允许将其拆分为并发任务,这些任务本质上知道要处理多少元素。 这允许在如上所述的特殊情况下放弃这些任务。 另一方面,组合由generatelimit返回的无限流不会产生大小的流(尽管这是可能的)。 由于这种流被视为具有未知大小,因此子任务必须在计数器上同步以确保遵守该限制。 这导致子任务(有时)完成,即使在特殊情况下也是如此。 但正如所说,这是实施细节的副作用,而非故意等待完成。 因为它是关于并发性的,所以如果你多次运行它,结果可能会有所不同。