Java:通过multithreading并行化快速排序

我正在尝试在Java中并行化算法。 我从合并排序开始,并在这个问题上发布了我的尝试。 我修改过的尝试是在下面的代码中,我现在尝试并行快速排序。

我的multithreading实现或解决此问题的方法是否有任何新手错误? 如果不是,我不应该期望在双核上的顺序算法和并行算法之间的速度增加超过32%(参见底部的时间)?

这是multithreading算法:

public class ThreadedQuick extends Thread { final int MAX_THREADS = Runtime.getRuntime().availableProcessors(); CountDownLatch doneSignal; static int num_threads = 1; int[] my_array; int start, end; public ThreadedQuick(CountDownLatch doneSignal, int[] array, int start, int end) { this.my_array = array; this.start = start; this.end = end; this.doneSignal = doneSignal; } public static void reset() { num_threads = 1; } public void run() { quicksort(my_array, start, end); doneSignal.countDown(); num_threads--; } public void quicksort(int[] array, int start, int end) { int len = end-start+1; if (len <= 1) return; int pivot_index = medianOfThree(array, start, end); int pivotValue = array[pivot_index]; swap(array, pivot_index, end); int storeIndex = start; for (int i = start; i < end; i++) { if (array[i] <= pivotValue) { swap(array, i, storeIndex); storeIndex++; } } swap(array, storeIndex, end); if (num_threads < MAX_THREADS) { num_threads++; CountDownLatch completionSignal = new CountDownLatch(1); new ThreadedQuick(completionSignal, array, start, storeIndex - 1).start(); quicksort(array, storeIndex + 1, end); try { completionSignal.await(1000, TimeUnit.SECONDS); } catch(Exception ex) { ex.printStackTrace(); } } else { quicksort(array, start, storeIndex - 1); quicksort(array, storeIndex + 1, end); } } } 

以下是我开始的方式:

 ThreadedQuick.reset(); CountDownLatch completionSignal = new CountDownLatch(1); new ThreadedQuick(completionSignal, array, 0, array.length-1).start(); try { completionSignal.await(1000, TimeUnit.SECONDS); } catch(Exception ex){ ex.printStackTrace(); } 

我针对Arrays.sort和类似的顺序快速排序算法对此进行了测试。 以下是intel duel-core dell笔记本电脑的计时结果:

元素:500,000,顺序:0.068592,线程:0.046871,Arrays.sort:0.079677

元素:1,000,000,顺序:0.14416,线程:0.095492,Arrays.sort:0.167155

元素:2,000,000,顺序:0.301666,线程:0.205719,Arrays.sort:0.350982

元素:4,000,000,顺序:0.623291,线程:0.424119,Arrays.sort:0.712698

元素:8,000,000,顺序:1.279374,螺纹:0.859363,Arrays.sort:1.487671

上面的每个数字是100次测试的平均时间,抛出3个最低和3个最高的情况。 我使用Random.nextInt(Integer.MAX_VALUE)为每个测试生成一个数组,每10次测试使用相同的种子初始化一次。 每个测试包括使用System.nanoTime对给定算法进行计时。 平均后我四舍五入到小数点后六位。 显然,我确实检查了每种是否有效

如您所见,在每组测试中,顺序和线程案例之间的速度提高了约32%。 正如我上面提到的,我不应该期待更多吗?

将numThreads设为静态可能会导致问题,很可能最终会在某些时候运行MAX_THREADS以上。

可能你没有在性能上完全翻倍的原因是你的快速排序无法完全并行化。 请注意,对quicksort的第一次调用将在初始线程中开始真正并行运行之前传递整个数组。 在进行分离线程时,以上下文切换和模式转换的forms并行化算法也存在开销。

看看Fork / Join框架,这个问题可能非常适合那里。

关于实施的几点意见。 实现Runnable而不是扩展Thread。 只有在创建一些新版本的Thread类时,才应使用扩展线程。 当你只想做一些并行运行的工作时,你最好使用Runnable。 在运行Runnable的同时,您还可以扩展另一个类,从而为OO设计提供更大的灵活性。 使用仅限于系统中可用线程数的线程池。 也不要使用numThreads来决定是否分叉新线程。 您可以预先计算出来。 使用最小分区大小,即总arrays的大小除以可用的处理器数。 就像是:

 public class ThreadedQuick implements Runnable { public static final int MAX_THREADS = Runtime.getRuntime().availableProcessors(); static final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS); final int[] my_array; final int start, end; private final int minParitionSize; public ThreadedQuick(int minParitionSize, int[] array, int start, int end) { this.minParitionSize = minParitionSize; this.my_array = array; this.start = start; this.end = end; } public void run() { quicksort(my_array, start, end); } public void quicksort(int[] array, int start, int end) { int len = end - start + 1; if (len <= 1) return; int pivot_index = medianOfThree(array, start, end); int pivotValue = array[pivot_index]; swap(array, pivot_index, end); int storeIndex = start; for (int i = start; i < end; i++) { if (array[i] <= pivotValue) { swap(array, i, storeIndex); storeIndex++; } } swap(array, storeIndex, end); if (len > minParitionSize) { ThreadedQuick quick = new ThreadedQuick(minParitionSize, array, start, storeIndex - 1); Future future = executor.submit(quick); quicksort(array, storeIndex + 1, end); try { future.get(1000, TimeUnit.SECONDS); } catch (Exception ex) { ex.printStackTrace(); } } else { quicksort(array, start, storeIndex - 1); quicksort(array, storeIndex + 1, end); } } } 

你可以这样做:

 ThreadedQuick quick = new ThreadedQuick(array / ThreadedQuick.MAX_THREADS, array, 0, array.length - 1); quick.run(); 

这将在同一个线程中启动排序,这可以避免在启动时出现不必要的线程跳转。

警告:不确定上面的实现会更快,因为我没有对它进行基准测试。

它使用快速排序和合并排序的组合。

 import java.util.Arrays; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class ParallelSortMain { public static void main(String... args) throws InterruptedException { Random rand = new Random(); final int[] values = new int[100*1024*1024]; for (int i = 0; i < values.length; i++) values[i] = rand.nextInt(); int threads = Runtime.getRuntime().availableProcessors(); ExecutorService es = Executors.newFixedThreadPool(threads); int blockSize = (values.length + threads - 1) / threads; for (int i = 0; i < values.length; i += blockSize) { final int min = i; final int max = Math.min(min + blockSize, values.length); es.submit(new Runnable() { @Override public void run() { Arrays.sort(values, min, max); } }); } es.shutdown(); es.awaitTermination(10, TimeUnit.MINUTES); for (int blockSize2 = blockSize; blockSize2 < values.length / 2; blockSize2 *= 2) { for (int i = 0; i < values.length; i += blockSize2) { final int min = i; final int mid = Math.min(min + blockSize2, values.length); final int max = Math.min(min + blockSize2 * 2, values.length); mergeSort(values, min, mid, max); } } } private static boolean mergeSort(int[] values, int left, int mid, int end) { int[] results = new int[end - left]; int l = left, r = mid, m = 0; for (; l < left && r < mid; m++) { int lv = values[l]; int rv = values[r]; if (lv < rv) { results[m] = lv; l++; } else { results[m] = rv; r++; } } while (l < mid) results[m++] = values[l++]; while (r < end) results[m++] = values[r++]; System.arraycopy(results, 0, values, left, results.length); return false; } } 

如果我理解你的代码,请注意几点评论:

  1. 我没有看到numthreads对象周围的锁,即使它可以通过多个线程访问。 也许你应该把它变成AtomicInteger。

  2. 使用线程池并安排任务,即对quicksort的单个调用,以利用线程池的优势。 使用期货。

你当前按照你正在做的方式划分事物的方法可以留下一个较小的分区,一个线程和一个较大的分区,没有一个线程。 也就是说,它没有使用自己的线程对较大的段进行优先级排序。