当Java 8 Stream抛出RuntimeException时,预期的行为是什么?
在流处理期间遇到RuntimeException
时,流处理是否应该中止? 应该先完成吗? 是否应该在Stream.close()
上Stream.close()
抛出exception? exception是否被重新抛出或被包裹? Stream
的JavaDoc和java.util.stream包没什么好说的。
我发现的所有关于Stackoverflow的问题似乎都集中在如何从function界面中包装已检查的exception以便编译它们的代码。 事实上,互联网上的博文和类似文章都集中在同一个警告上。 这不是我关心的问题。
我根据自己的经验知道,一旦抛出RuntimeException
, 顺序流的处理就会中止,并且这个exception会按原样重新抛出。 仅当客户端线程抛出exception时,对于并行流才是相同的。
但是, 此处的示例代码演示了如果在并行流处理期间由“工作线程”(=与调用终端操作的线程不同的线程)抛出exception,则此exception将永远丢失并且流处理完成。
示例代码将首先并行运行IntStream
。 然后是“正常” Stream
并行。
这个例子将表明,
1)如果遇到RuntimeException, IntStream
在中止并行处理方面没有问题。 exception被重新抛出,包装在另一个RuntimeException中。
2) Stream
不好玩。 实际上,客户端线程永远不会看到抛出RuntimeException的痕迹。 流不仅完成处理; 将处理更多元素而不是指定的limit()
!
在示例代码中,使用IntStream.range()生成IntStream 。 “普通” Stream
没有“范围”的概念,而是由1:s组成,但调用Stream.limit()将流限制为10亿个元素。
这是另一个转折点。 生成IntStream的示例代码执行如下操作:
IntStream.range(0, 1_000_000_000).parallel().forEach(..)
将其更改为生成的流,就像代码中的第二个示例一样:
IntStream.generate(() -> 1).limit(1_000_000_000).parallel().forEach(..)
此IntStream的结果是相同的:exception被包装并重新抛出,处理中止。 但是,第二个流现在也将包装并重新抛出exception,而不是处理超出限制的元素! 因此:更改第一个流的生成方式会对第二个流的行为产生副作用。 对我来说,这很奇怪。
ForkJoinPool.invoke()和ForkJoinTask
JavaDoc表示exception被重新抛出,这是我对并行流的期望。
背景
当处理从Collection.stream().parallel()
获取的并行流中的元素时,我遇到了这个“问题”(我没有validationCollection.parallelStream()
的行为,但它应该是相同的)。 发生的事情是“工作线程”崩溃然后静静地离开,而所有其他线程成功完成了流。 我的应用程序使用默认的exception处理程序将exception写入日志文件。 但是甚至没有创建这个日志文件。 线程和他的例外根本就消失了。 由于我需要在捕获运行时exception时立即中止,因此一种替代方法是编写将此exception泄漏给其他工作程序的代码,使其在任何其他线程抛出exception时不愿意继续。 当然,这并不能保证流实现只是继续生成尝试完成流的新线程。 所以我可能最终不会使用并行流,而是使用线程池/执行器进行“正常”并发编程。
这表明运行时exception丢失的问题不会与Stream.generate()
或使用Stream.limit()
生成的流隔离。 最重要的是,我很想知道…是预期的行为?
关于exception报告,这两个流的行为没有区别,问题是您将两个测试一个接一个地放入一个方法中,并让它们访问共享数据结构。
有一个微妙的,可能没有充分记录(如果有意)的行为:当流操作exception完成时,它不会等待所有并发操作的完成。
因此,当您捕获第一个流操作的exception时,仍有一些线程正在运行并访问您的共享数据。 因此,当您重置AtomicBoolean
,属于第一个作业的其中一个线程将读取false
值,将其变为true
,打印消息并抛出一个丢失的exception,因为流操作已经exception完成。 此外,这些线程中的一些将在您重置后提升您的计数器,这就是为什么它的数量高于第二个作业允许的数量。 您的第二个作业没有exception完成,因为属于第二个作业的所有线程都将从AtomicBoolean
读取一个true
值。
有一些方法可以发现这一点。
当您删除第一个流操作时,第二个将按预期exception完成。 另外,插入语句
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
两个流操作之间将解决问题,因为它等待所有线程的完成。
但是,更干净的解决方案是让两个流操作都使用自己的计数器和标志。
也就是说,如果你只是交换这两个操作,那么存在一个微妙的,依赖于实现的差异导致问题消失。 IntStream.range
操作生成具有已知大小的流,这允许将其拆分为并发任务,这些任务本质上知道要处理多少元素。 这允许在如上所述的特殊情况下放弃这些任务。 另一方面,组合由generate
和limit
返回的无限流不会产生大小的流(尽管这是可能的)。 由于这种流被视为具有未知大小,因此子任务必须在计数器上同步以确保遵守该限制。 这导致子任务(有时)完成,即使在特殊情况下也是如此。 但正如所说,这是实施细节的副作用,而非故意等待完成。 因为它是关于并发性的,所以如果你多次运行它,结果可能会有所不同。