哪个更快? 更多可运行的工作更少,或者更少的可运行工作? (ExecutorService的)

我试图弄清楚如何从multithreading应用程序中获得最大性能。
我有一个我创建的线程池,如下所示:

ExecutorService executor = Executors.newFixedThreadPool(8); // I have 8 CPU cores. 

我的问题是,我应该将工作分成只有8个runnables / callables,这与线程池中的线程数相同,还是应该将它分成1000000 runnables / callables呢?

 for (int i = 0; i < 1000000; i++) { Callable worker = new MyCallable(); // Each worker does little work. Future submit = executor.submit(worker); } long sum = 0; for (Future future : list) sum += future.get(); // Much more overhead from the for loops 

要么

 for (int i = 0; i < 8; i++) { Callable worker = new MyCallable(); // Each worker does much more work. Future submit = executor.submit(worker); } long sum = 0; for (Future future : list) sum += future.get(); // Negligible overhead from the for loops 

划分为1000000个callable对我来说似乎比较慢,因为实例化所有这些callables并从for循环中收集它们的结果。 另一方面,如果我有8个callables,这个开销可以忽略不计。 而且因为我只有8个线程,所以我不能同时运行1000000个callables,因此没有性能提升。

我是对还是错?

顺便说一句,我可以测试这些情况,但操作非常简单,我想编译器意识到并进行了一些优化。 所以结果可能会产生误导。 我想知道哪种方法更适合像图像处理应用程序。

这个问题有两个方面。

首先,你有技术Java的东西。 正如你对此有一些答案,我将总结这些基础知识:

  • 如果你有N个核心,那么只要每个任务只受CPU约束(即不涉及I / O),N个线程就会给你最好的结果
  • 每个Thread应该完成比任务所需的工作更多的工作,即将N个线程计数到10会慢得多,因为创建和管理额外Threads的开销高于并行计数到10的好处
  • 你需要确保任何同步开销低于正在完成的工作,即让N个Threads调用synchronized增量方法会慢得多
  • Threads确实占用了资源,最常见的是内存。 你拥有的线程越多,估计你的内存使用就越困难,并且可能会影响GC时序(很少见,但我看到它发生了)

其次,你有调度理论。 你需要考虑你的程序在做什么

  • 通常使用Threads来阻止I / O操作。 如果您可以将CPU用于其他任务,则不希望您编程等待网络或HDD
  • 有一些关于日程安排的好书(不记得名字)可以帮助你设计有效的程序。 在您提到的示例中,可能存在额外线程有意义的情况。 例如,如果您的任务没有确定的持续时间,则会出现偏差并且平均响应时间很重要:假设您有2个核心任务和4个任务。 任务A和B每个需要1分钟,但C&D需要10分钟。 如果您运行这些针对2个线程,首先执行C&D,您的总时间将为11分钟,但您的平均响应时间将为(10 + 10 + 11 + 11)/4=10.5分钟。 如果你执行4个线程,那么你的响应时间将是((1 + a)+(1 + a)+(10 + a)+(10 + a))/ 4 = 5.5 + a,其中a是调度等待时间近似。 这是非常理论化的,因为有许多变量没有解释,但可以帮助设计线程程序。 (同样在上面的例子中,因为你在等待Futures你很可能不关心平均响应时间)
  • 使用多个Thread池时必须小心。 使用多个池可能会导致死锁(如果在两个池之间引入依赖关系)并使其难以优化(可以在池之间创建争用并且可能无法获得正确的大小)

– 编辑 –

最后,如果它有所帮助,我对性能的看法是我有4个主要资源:CPU,RAM,磁盘和网络。 我试图找出哪个是我的瓶颈,并使用非饱和资源进行优化。 例如,如果我有大量空闲CPU和低内存,我可能会压缩我的内存数据。 如果我有大量磁盘I / O和大内存,请缓存更多数据。 如果网络资源(不是实际的网络连接)很慢,则使用许multithreading进行并行化。 一旦您在关键路径上使资源类型饱和并且无法使用其他资源来加速它,您就达到了最高性能,并且需要升级H / W以获得更快的结果。

这个问题没有直接的答案,因为它取决于你的代码,应用程序loigc,max,可能的并发,hw等很多东西。

但是在考虑并发时你应该考虑下面的事情,

  1. 每个runnable都需要一个专用于该线程的堆栈,因此如果你创建了大的no。 线程中的线程内存消耗量大于实际应用程序的使用量
  2. 线程应该执行独立且并行的任务。

    找出可以实际并行执行的代码补丁,没有任何依赖性,否则线程无济于事

  3. 什么是硬件配置?

    您可以实现的线程的最大并发执行数等于总数。 的cpu核心。 如果你少了没有。 核心和巨大的没有。 然后切换任务比实际线程更活跃(使用cpu)。 这可能会严重影响性能

总而言之,你的第二种方法看起来对我很好,但如果可能的话,找出更多的并行性,你可以将它扩展到20-30。

也许这段代码有帮助。 它将使用fork-join池计算斐波纳契数。 使用fork-join,我们可以递归地细分问题并组合每个递归级别的结果。 从理论上讲,我们可以在fork-join池中递归到fib(0),但这样效率很低。 因此,我们引入一个递归限制,我们停止细分任务并计算当前任务中的其余部分。 此代码将记录fib(x)所用的时间,并计算n到x的每个fib(n)的单线程时间。 对于每个递归限制,它将平均测量创建的任务数和每次运行的时长。

通常情况下,最佳点是任务大小超过1μs,但是我们这里简单的斐波那契任务几乎不需要内存/缓存。 对于具有更高缓存污染的更多数据密集型任务,交换机更昂贵并且并发任务可能污染共享缓存。

 import java.util.concurrent.*; import java.util.concurrent.atomic.*; public class FibonacciFork extends RecursiveTask { private static final long serialVersionUID = 1L; public FibonacciFork( long n) { super(); this.n = n; } static ForkJoinPool fjp = new ForkJoinPool( Runtime.getRuntime().availableProcessors()); static long fibonacci0( long n) { if ( n < 2) { return n; } return fibonacci0( n - 1) + fibonacci0( n - 2); } static int rekLimit = 8; private static long stealCount; long n; private long forkCount; private static AtomicLong forks = new AtomicLong( 0); static class Result { long durMS; int rekLimit; } public static void main( String[] args) { int fiboArg = 49; BenchLogger.sysinfo( "Warmup"); long singleNS[] = getSingleThreadNanos( 20, 5e9); BenchLogger.sysinfo( "Warmup complete"); singleNS = getSingleThreadNanos( fiboArg, 1e9); BenchLogger.sysinfo( "Single Thread Times complete"); Result[] results = new Result[ fiboArg + 1]; for ( int rekLimit = 2; rekLimit <= fiboArg; rekLimit++) { results[ rekLimit] = new Result(); runWithRecursionLimit( rekLimit, fiboArg, singleNS[ rekLimit], results[ rekLimit]); } System.out.println( "CSV results for Fibo " + fiboArg + "\n" + "RekLimit\t" + "Jobs ns\t" + "time ms"); for ( int rekLimit = 2; rekLimit <= fiboArg; rekLimit++) { System.out.println( rekLimit + "\t" + singleNS[ rekLimit] + "\t" + results[ rekLimit].durMS); } } private static long[] getSingleThreadNanos( final int n, final double minRuntimeNS) { final long timesNS[] = new long[ n + 1]; ExecutorService es = Executors.newFixedThreadPool( Math.max( 1, Runtime.getRuntime().availableProcessors() / 8)); for ( int i = 2; i <= n; i++) { final int arg = i; Runnable runner = new Runnable() { @Override public void run() { long start = System.nanoTime(); long result = fibonacci0( arg); long end = System.nanoTime(); double durNS = end - start; long ntimes = 1; double fact = 1; while ( durNS < minRuntimeNS) { long oldNTimes = ntimes; if ( durNS > 0) { ntimes = Math.max( 1, ( long) ( oldNTimes * fact * minRuntimeNS / durNS)); } else { ntimes *= 2; } start = System.nanoTime(); for ( long i = 0; i < ntimes; i++) { result = fibonacci0( arg); } end = System.nanoTime(); durNS = end - start; fact *= 1.1; } timesNS[ arg] = ( long) ( durNS / ntimes); System.out.println( "Single Fib(" + arg + ")=" + result + " in " + ( timesNS[ arg] / 1e6) + "ms (" + ntimes + " loops in " + (durNS / 1e6) + " ms)"); } }; es.execute( runner); } es.shutdown(); try { es.awaitTermination( 1, TimeUnit.HOURS); } catch ( InterruptedException e) { BenchLogger.sysinfo( "Single Timeout"); } return timesNS; } private static void runWithRecursionLimit( int r, int arg, long singleThreadNanos, Result result) { rekLimit = r; long start = System.currentTimeMillis(); long fiboResult = fibonacci( arg); long end = System.currentTimeMillis(); // Steals zählen long currentSteals = fjp.getStealCount(); long newSteals = currentSteals - stealCount; stealCount = currentSteals; long forksCount = forks.getAndSet( 0); final long durMS = end-start; System.out.println( "Fib(" + arg + ")=" + fiboResult + " in " + durMS + "ms, recursion limit: " + r + " at " + ( singleThreadNanos / 1e6) + "ms, steals: " + newSteals + " forks " + forksCount); result.durMS = durMS; result.rekLimit = r; } static long fibonacci( final long arg) { FibonacciFork task = new FibonacciFork( arg); long result = fjp.invoke( task); forks.set( task.forkCount); return result; } @Override protected Long compute() { if ( n <= rekLimit) { return fibonacci0( n); } FibonacciFork ff1 = new FibonacciFork( n-1); FibonacciFork ff2 = new FibonacciFork( n-2); ff1.fork(); long r2 = ff2.compute(); long r1 = ff1.join(); forkCount = ff2.forkCount + ff1.forkCount + 1; return r1 + r2; } }