如何使用java并行流而不是executorThreadsPool?

我想编写一个测试,对我的API执行许多并行调用。

ExecutorService executor = Executors.newCachedThreadPool(); final int numOfUsers = 10; for (int i = 0; i  { final Device device1 = getFirstDevice(); final ResponseDto responseDto = devicesServiceLocal.acquireDevice(device1.uuid, 4738); if (responseDto.status == Status.SUCCESS) { successCount.incrementAndGet(); } }); } 

我知道我可以使用executorThreadsPool来做到这一点,如下所示:

 devicesList.parallelStream() .map(device -> do something) 

我可以用java8并行流创建它:

我怎么能在一台设备上做到这一点?

这意味着我很少想要获得相同的设备。

像这样的东西:

 {{device}}.parallelStream().execute(myAction).times(10) 

是的它可以,但……

你会想

 Stream.generate(() -> device) .limit(10) .parallel() .forEach(device -> device.execute()); 

应该做的工作。 但不,因为原因(我真的不知道为什么,没有线索)。 如果我让device.execute()等一下,然后让它打印一些东西。 流每秒打印10次。 所以它根本不是平行的,不是你想要的。

Google是我的朋友,我发现很多文章都警告过parallelStream。 但我的目光落在了http://blog.jooq.org/2014/06/13/java-8-friday-10-subtle-mistakes-when-using-the-streams-api/号码8和9上.8是如果它是由一个集合支持你必须对它进行排序,它会神奇地工作:

 Stream.generate(() -> device) .limit(10) .sorted((a,b)->0) // Sort it (kind of), what?? .parallel() .forEach(device -> device.execute()); 

现在它在一秒钟后打印8次,在另一秒钟后打印2次。 我有8个核心,所以这是我们(有点)的期望。

我在我的流中使用了.forEach() ,但起初我(就像你的例子)使用.map().map()没有打印东西:流从未消耗过(参见链接文章中的9)。

所以,要注意使用流,尤其是并行流。 你必须确定你的流是消耗的,它是有限的( .limit() ),它是并行的,等等。流是很奇怪的,我建议保持你的工作解决方案。

注意:如果device.execute()是一个阻塞操作(IO,网络…),那么你将永远不会超过你的核心数(在我的情况下为8个)同时执行的任务。

更新(感谢Holger ):

霍尔格给出了一个优雅的选择:

 IntStream.range(0,10) .parallel() .mapToObject(i -> getDevice()) .forEach(device -> device.execute()); // Or shorter: IntStream.range(0,10) .parallel() .forEach(i -> getDevice().execute()); 

这就像一个并行的for循环(它的工作原理)。