stream和parallelStream
我有一个这样的测试代码:
List list = new ArrayList(1000000); for(int i=0;i<1000000;i++){ list.add(i); } List values = new ArrayList(1000000); list.stream().forEach( i->values.add(new Date().toString()) ); System.out.println(values.size());
运行这个,我得到了正确的输出:1000000。
但是,如果我将stream()
更改为parallelStream()
,如下所示:
list.parallelStream().forEach( i->values.add(new Date().toString()) );
我有一个随机输出,例如:920821。
怎么了?
ArrayList
未同步。 没有定义尝试同时向其添加元素。 从forEach
:
对于并行流管道,此操作不保证遵守流的遭遇顺序,因为这样做会牺牲并行性的好处。 对于任何给定元素,可以在任何时间以及库选择的任何线程中执行该动作 。
在第二个示例中,最终会有多个线程同时在数组列表上调用add
,而ArrayList
文档说:
请注意,此实现不同步。 如果多个线程同时访问ArrayList实例,并且至少有一个线程在结构上修改了列表,则必须在外部进行同步。
错误的解决方案
如果将ArrayList
的使用更改为Vector
,则会得到正确的结果,因为此列表实现是同步的。 它的Javadoc说:
与新的集合实现不同,
Vector
是同步的。
但是, 不要使用它! 此外,由于显式同步,它可能最终变慢。
正确的方法
通过使用collect
方法,明确地避免Stream API提供可变缩减范例的这种情况。 下列
List values = list.stream().map(i -> "foo").collect(Collectors.toList());
将始终提供正确的结果,无论是否并行运行。 Stream管道在内部处理并发性,并保证在并行流的collect操作中使用非并发收集器是安全的 。 Collectors.toList()
是一个内置的收集器,将Stream的元素累积到列表中。
使用消费者,您必须担心线程安全。 一个更简单的解决方案是让Stream API累积结果。
List values = IntStream.range(0, 1_000_000).parallel() .mapToObj(i -> new Date().toString()) .collect(Collectors.toList());
避免使用像Vector这样的线程安全收集器的一个关键原因是它要求每个线程获得共享锁是一个瓶颈,即你将花时间获取和释放锁,并且一次只有一个线程可以访问它。 您可以轻松地获得比单独使用一个线程更慢的解决方案。
values.add(String)
不是线程安全的。 当您从不同的线程调用此方法而没有同步时,不能保证它将按预期工作。
要解决这个问题,您可以:
- 使用像
Vector
或CopyOnWriteArrayLis
这样的线程安全集合。 - 明确同步您的代码。 例如,将
synchronize(this){values.add(new Date().toString())}
放入代码中。 注意i->
是外部同步块 - 或者在这种情况下,地图元素获取新流,如@PeterLawrey,答案:
IntStream.range(0, 1_000_000).parallel().mapToObj(i -> new Date().toString()).collect(Collectors.toList());