并行化快速排序使其变慢

我正在快速搜索大量数据,为了获得乐趣,我尝试将其并行化以加快排序速度。 但是,在它的当前forms中,由于同步阻塞点,multithreading版本比单线程版本慢。
每次我生成一个线程时,我都会对一个int进行锁定并递增它,并且每次线程完成时我都会再次获得锁定和减少,此外还要检查是否还有任何线程仍在运行(int> 0)。 如果没有,我唤醒我的主线程并使用已排序的数据。

我相信有更好的方法可以做到这一点。 不知道它是什么。 非常感谢帮助。

编辑:我想我没有提供足够的信息。
这是octo-core Opteron上的Java代码。 我无法切换语言。
我正在排序的数量适合内存,并且在调用quicksort时它已经存在于内存中,因此没有理由将其写入磁盘只是将其读回内存。
通过“获取锁定”我的意思是在整数上有一个同步块。

在不了解更多有关实施的地方,我的建议和/或意见如下:

  1. 限制在任何给定时间可以运行的线程数。 Pergaps 8或10(可能为调度程序提供更多余地,尽管最好每个核心/ hw线程放一个 )。 如果亲和力不支持,则在CPU限制问题上运行更multithreading以获得“吞吐量”并不是真的。

  2. 不要靠近树叶! 只有更大的分支上的线程。 没有必要产生一个线程来排序相对较少数量的项目,在这个级别上有许多小分支! 线程会在这里增加更多的相对开销。 (这类似于切换到叶子的“简单排序”)。

  3. 确保每个线程可以独立工作 – 不应该在工作期间踩到另一个线程 – > 没有锁,只是等待连接 。 分而治之。

  4. 可能会考虑执行“广度优先”方法来生成线程。

  5. 考虑快速排序的合并(我偏向于mergesort :-)请记住,有许多不同类型的合并,包括自下而上。

编辑

  1. 确保它确实有效。 请记住正确利用线程之间的内存障碍 – 即使没有两个线程同时修改相同的数据以确保正确的可见性也需要。

编辑(概念validation):

我把这个简单的演示汇总在一起。 在我的Intel Core2 Duo @ 2Ghz上,我可以在大约2/3到3/4的时间内运行,这肯定是一些改进:)(设置:DATA_SIZE = 3000000,MAX_THREADS = 4,MIN_PARALLEL = 1000)。 这是从维基百科中删除的基本就地快速排序代码,它没有利用任何其他基本优化。

它确定一个新线程是否可以/应该启动的方法也非常原始 – 如果没有新线程可用,它只是一直突然(因为,你知道,为什么要等待?)

这段代码也应该(希望)与线程一起广泛分散。 这可能对数据局部性的效率低于保持深度的效率,但如果我的头脑,模型似乎很简单。

执行程序服务还用于简化设计并能够重用相同的线程(与生成新线程相比)。 在执行程序开销显示之前,MIN_PARALLEL可以变得非常小(例如,大约20) – 最大线程数和仅使用新线程 – 如果可能的话也可以保持这一点。

  qsort平均秒数:0.6290541056
 pqsort平均秒数:0.4513915392 

我绝对不保证这段代码的实用性或正确性,但它“似乎在这里工作”。 请注意ThreadPoolExecutor旁边的警告,因为它清楚地表明我不完全确定发生了什么:-) 我相当肯定设计在利用线程方面有些缺陷。

package psq; import java.util.Arrays; import java.util.Random; import java.util.concurrent.*; public class Main { int[] genData (int len) { Random r = new Random(); int[] newData = new int[len]; for (int i = 0; i < newData.length; i++) { newData[i] = r.nextInt(); } return newData; } boolean check (int[] arr) { if (arr.length == 0) { return true; } int lastValue = arr[0]; for (int i = 1; i < arr.length; i++) { //System.out.println(arr[i]); if (arr[i] < lastValue) { return false; } lastValue = arr[i]; } return true; } int partition (int[] arr, int left, int right, int pivotIndex) { // pivotValue := array[pivotIndex] int pivotValue = arr[pivotIndex]; { // swap array[pivotIndex] and array[right] // Move pivot to end int t = arr[pivotIndex]; arr[pivotIndex] = arr[right]; arr[right] = t; } // storeIndex := left int storeIndex = left; // for i from left to right - 1 // left ≤ i < right for (int i = left; i < right; i++) { //if array[i] ≤ pivotValue if (arr[i] <= pivotValue) { //swap array[i] and array[storeIndex] //storeIndex := storeIndex + 1 int t = arr[i]; arr[i] = arr[storeIndex]; arr[storeIndex] = t; storeIndex++; } } { // swap array[storeIndex] and array[right] // Move pivot to its final place int t = arr[storeIndex]; arr[storeIndex] = arr[right]; arr[right] = t; } // return storeIndex return storeIndex; } void quicksort (int[] arr, int left, int right) { // if right > left if (right > left) { // select a pivot index //(eg pivotIndex := left + (right - left)/2) int pivotIndex = left + (right - left) / 2; // pivotNewIndex := partition(array, left, right, pivotIndex) int pivotNewIndex = partition(arr, left, right, pivotIndex); // quicksort(array, left, pivotNewIndex - 1) // quicksort(array, pivotNewIndex + 1, right) quicksort(arr, left, pivotNewIndex - 1); quicksort(arr, pivotNewIndex + 1, right); } } static int DATA_SIZE = 3000000; static int MAX_THREADS = 4; static int MIN_PARALLEL = 1000; // NOTE THAT THE THREAD POOL EXECUTER USES A LINKEDBLOCKINGQUEUE // That is, because it's possible to OVER SUBMIT with this code, // even with the semaphores! ThreadPoolExecutor tp = new ThreadPoolExecutor( MAX_THREADS, MAX_THREADS, Long.MAX_VALUE, TimeUnit.NANOSECONDS, new LinkedBlockingQueue()); // if there are no semaphore available then then we just continue // processing from the same thread and "deal with it" Semaphore sem = new Semaphore(MAX_THREADS, false); class QuickSortAction implements Runnable { int[] arr; int left; int right; public QuickSortAction (int[] arr, int left, int right) { this.arr = arr; this.left = left; this.right = right; } public void run () { try { //System.out.println(">>[" + left + "|" + right + "]"); pquicksort(arr, left, right); //System.out.println("<<[" + left + "|" + right + "]"); } catch (Exception ex) { // I got nothing for this throw new RuntimeException(ex); } } } // pquicksort // threads will [hopefully] fan-out "breadth-wise" // this is because it's likely that the 2nd executer (if needed) // will be submitted prior to the 1st running and starting its own executors // of course this behavior is not terribly well-define void pquicksort (int[] arr, int left, int right) throws ExecutionException, InterruptedException { if (right > left) { // memory barrier -- pquicksort is called from different threads synchronized (arr) {} int pivotIndex = left + (right - left) / 2; int pivotNewIndex = partition(arr, left, right, pivotIndex); Future f1 = null; Future f2 = null; if ((pivotNewIndex - 1) - left > MIN_PARALLEL) { if (sem.tryAcquire()) { f1 = tp.submit(new QuickSortAction(arr, left, pivotNewIndex - 1)); } else { pquicksort(arr, left, pivotNewIndex - 1); } } else { quicksort(arr, left, pivotNewIndex - 1); } if (right - (pivotNewIndex + 1) > MIN_PARALLEL) { if (sem.tryAcquire()) { f2 = tp.submit(new QuickSortAction(arr, pivotNewIndex + 1, right)); } else { pquicksort(arr, pivotNewIndex + 1, right); } } else { quicksort(arr, pivotNewIndex + 1, right); } // join back up if (f1 != null) { f1.get(); sem.release(); } if (f2 != null) { f2.get(); sem.release(); } } } long qsort_call (int[] origData) throws Exception { int[] data = Arrays.copyOf(origData, origData.length); long start = System.nanoTime(); quicksort(data, 0, data.length - 1); long duration = System.nanoTime() - start; if (!check(data)) { throw new Exception("qsort not sorted!"); } return duration; } long pqsort_call (int[] origData) throws Exception { int[] data = Arrays.copyOf(origData, origData.length); long start = System.nanoTime(); pquicksort(data, 0, data.length - 1); long duration = System.nanoTime() - start; if (!check(data)) { throw new Exception("pqsort not sorted!"); } return duration; } public Main () throws Exception { long qsort_duration = 0; long pqsort_duration = 0; int ITERATIONS = 10; for (int i = 0; i < ITERATIONS; i++) { System.out.println("Iteration# " + i); int[] data = genData(DATA_SIZE); if ((i & 1) == 0) { qsort_duration += qsort_call(data); pqsort_duration += pqsort_call(data); } else { pqsort_duration += pqsort_call(data); qsort_duration += qsort_call(data); } } System.out.println("===="); System.out.println("qsort average seconds: " + (float)qsort_duration / (ITERATIONS * 1E9)); System.out.println("pqsort average seconds: " + (float)pqsort_duration / (ITERATIONS * 1E9)); } public static void main(String[] args) throws Exception { new Main(); } } 

因人而异。 快乐的编码。

(另外,我想知道你的8核盒子上的这个 - 或类似的 - 代码展览会。维基百科声称可以通过cpus数量进行线性加速:)

编辑(更好的数字)

删除使用期货导致轻微的“堵塞”并切换到单个最终等待信号量:减少无用的等待。 现在只运行55%的非线程时间:-)

  qsort平均秒数:0.5999702528
 pqsort平均秒数:0.3346969088 

 package psq; import java.util.Arrays; import java.util.Random; import java.util.concurrent.*; public class Main { int[] genData (int len) { Random r = new Random(); int[] newData = new int[len]; for (int i = 0; i < newData.length; i++) { newData[i] = r.nextInt(); } return newData; } boolean check (int[] arr) { if (arr.length == 0) { return true; } int lastValue = arr[0]; for (int i = 1; i < arr.length; i++) { //System.out.println(arr[i]); if (arr[i] < lastValue) { return false; } lastValue = arr[i]; } return true; } int partition (int[] arr, int left, int right, int pivotIndex) { // pivotValue := array[pivotIndex] int pivotValue = arr[pivotIndex]; { // swap array[pivotIndex] and array[right] // Move pivot to end int t = arr[pivotIndex]; arr[pivotIndex] = arr[right]; arr[right] = t; } // storeIndex := left int storeIndex = left; // for i from left to right - 1 // left ≤ i < right for (int i = left; i < right; i++) { //if array[i] ≤ pivotValue if (arr[i] <= pivotValue) { //swap array[i] and array[storeIndex] //storeIndex := storeIndex + 1 int t = arr[i]; arr[i] = arr[storeIndex]; arr[storeIndex] = t; storeIndex++; } } { // swap array[storeIndex] and array[right] // Move pivot to its final place int t = arr[storeIndex]; arr[storeIndex] = arr[right]; arr[right] = t; } // return storeIndex return storeIndex; } void quicksort (int[] arr, int left, int right) { // if right > left if (right > left) { // select a pivot index //(eg pivotIndex := left + (right - left)/2) int pivotIndex = left + (right - left) / 2; // pivotNewIndex := partition(array, left, right, pivotIndex) int pivotNewIndex = partition(arr, left, right, pivotIndex); // quicksort(array, left, pivotNewIndex - 1) // quicksort(array, pivotNewIndex + 1, right) quicksort(arr, left, pivotNewIndex - 1); quicksort(arr, pivotNewIndex + 1, right); } } static int DATA_SIZE = 3000000; static int MAX_EXTRA_THREADS = 7; static int MIN_PARALLEL = 500; // To get to reducePermits @SuppressWarnings("serial") class Semaphore2 extends Semaphore { public Semaphore2(int permits, boolean fair) { super(permits, fair); } public void removePermit() { super.reducePermits(1); } } class QuickSortAction implements Runnable { final int[] arr; final int left; final int right; final SortState ss; public QuickSortAction (int[] arr, int left, int right, SortState ss) { this.arr = arr; this.left = left; this.right = right; this.ss = ss; } public void run () { try { //System.out.println(">>[" + left + "|" + right + "]"); pquicksort(arr, left, right, ss); //System.out.println("<<[" + left + "|" + right + "]"); ss.limit.release(); ss.countdown.release(); } catch (Exception ex) { // I got nothing for this throw new RuntimeException(ex); } } } class SortState { final public ThreadPoolExecutor pool = new ThreadPoolExecutor( MAX_EXTRA_THREADS, MAX_EXTRA_THREADS, Long.MAX_VALUE, TimeUnit.NANOSECONDS, new LinkedBlockingQueue()); // actual limit: executor may actually still have "active" things to process final public Semaphore limit = new Semaphore(MAX_EXTRA_THREADS, false); final public Semaphore2 countdown = new Semaphore2(1, false); } void pquicksort (int[] arr) throws Exception { SortState ss = new SortState(); pquicksort(arr, 0, arr.length - 1, ss); ss.countdown.acquire(); } // pquicksort // threads "fork" if available. void pquicksort (int[] arr, int left, int right, SortState ss) throws ExecutionException, InterruptedException { if (right > left) { // memory barrier -- pquicksort is called from different threads // and those threads may be created because they are in an executor synchronized (arr) {} int pivotIndex = left + (right - left) / 2; int pivotNewIndex = partition(arr, left, right, pivotIndex); { int newRight = pivotNewIndex - 1; if (newRight - left > MIN_PARALLEL) { if (ss.limit.tryAcquire()) { ss.countdown.removePermit(); ss.pool.submit(new QuickSortAction(arr, left, newRight, ss)); } else { pquicksort(arr, left, newRight, ss); } } else { quicksort(arr, left, newRight); } } { int newLeft = pivotNewIndex + 1; if (right - newLeft > MIN_PARALLEL) { if (ss.limit.tryAcquire()) { ss.countdown.removePermit(); ss.pool.submit(new QuickSortAction(arr, newLeft, right, ss)); } else { pquicksort(arr, newLeft, right, ss); } } else { quicksort(arr, newLeft, right); } } } } long qsort_call (int[] origData) throws Exception { int[] data = Arrays.copyOf(origData, origData.length); long start = System.nanoTime(); quicksort(data, 0, data.length - 1); long duration = System.nanoTime() - start; if (!check(data)) { throw new Exception("qsort not sorted!"); } return duration; } long pqsort_call (int[] origData) throws Exception { int[] data = Arrays.copyOf(origData, origData.length); long start = System.nanoTime(); pquicksort(data); long duration = System.nanoTime() - start; if (!check(data)) { throw new Exception("pqsort not sorted!"); } return duration; } public Main () throws Exception { long qsort_duration = 0; long pqsort_duration = 0; int ITERATIONS = 10; for (int i = 0; i < ITERATIONS; i++) { System.out.println("Iteration# " + i); int[] data = genData(DATA_SIZE); if ((i & 1) == 0) { qsort_duration += qsort_call(data); pqsort_duration += pqsort_call(data); } else { pqsort_duration += pqsort_call(data); qsort_duration += qsort_call(data); } } System.out.println("===="); System.out.println("qsort average seconds: " + (float)qsort_duration / (ITERATIONS * 1E9)); System.out.println("pqsort average seconds: " + (float)pqsort_duration / (ITERATIONS * 1E9)); } public static void main(String[] args) throws Exception { new Main(); } } 

线程很贵。 如果没有大量数据要排序,请不要使用线程。 或者您可以使用具有更好的并发设计的语言。 例如,Erlang具有非常轻量级的线程,可用于排序。

通过“获取锁定”我的意思是在整数上有一个同步块。 如果我理解正确的话:你锁定了你实际排序的每一个元素,听起来它会非常慢!

听起来你正在产生太多的线程…你没有告诉我们你实际产生了多少个线程,但是如果你每个整数做一个线程那么它几乎肯定会更慢(几乎肯定是轻描淡写)。 你想要做的是产生8个线程,因为你有8个核心,并将你的数组“分区”成8个部分,你将分别快速分配,然后连接就像在原始算法中一样。

以下是如何实现它的一些示例: multithreading快速排序或合并排序