无法multithreading化可伸缩方法

更新:为了帮助澄清我的要求,我已经发布了一些可以实现这个想法的java代码。

前一段时间我问了一个关于如何得到一个算法来分解一组数字的问题,想法是给它一个数字列表(1,2,3,4,5)和一个总数( 10 )和它将计算出每个数字的所有倍数将加起来总数( '1*10'或’ 1*1,1*2,1*3,1*4 ‘或’ 2*5 ‘等等。 )。 这是我做过的第一次编程练习,所以它花了我一段时间才开始工作,但现在我想试着看看我是否可以扩展它。 原问题中的人说它是可扩展的,但我对如何做到这一点感到困惑。 递归部分是我在缩放组合所有结果的部分时遇到的区域(它所指的表不可扩展但是应用缓存我能够使它快速)

我有以下算法(伪代码):

 //generates table for i = 1 to k for z = 0 to sum: for c = 1 to z / x_i: if T[z - c * x_i][i - 1] is true: set T[z][i] to true //uses table to bring all the parts together function RecursivelyListAllThatWork(k, sum) // Using last k variables, make sum /* Base case: If we've assigned all the variables correctly, list this * solution. */ if k == 0: print what we have so far return /* Recursive step: Try all coefficients, but only if they work. */ for c = 0 to sum / x_k: if T[sum - c * x_k][k - 1] is true: mark the coefficient of x_k to be c call RecursivelyListAllThatWork(k - 1, sum - c * x_k) unmark the coefficient of x_k 

我真的对如何线程/多处理RecursivelyListAllThatWork函数感到茫然。 我知道如果我发送一个较小的K(它是列表中项目总数的int),它将处理该子集但我不知道如何组合整个子集的结果。 例如,如果列表是[1,2,3,4,5,6,7,8,9,10]并且我发送它K = 3那么只有1,2,3得到处理哪个很好但是怎么样如果我需要包含1和10的结果? 我试图修改表(变量T),所以只有我想要的子集在那里,但仍然不起作用,因为,像上面的解决方案,它做了一个子集,但不能处理需要更广范围的答案。

我不需要任何代码,只要有人可以解释如何在概念上打破这个递归步骤,以便可以使用其他核心/机器。

更新:我似乎还无法弄清楚如何将RecursivelyListAllThatWork转换为runnable(我知道技术上如何做,但我不明白如何更改RecursivelyListAllThatWork算法,以便它可以并行运行。其他部分只是在这里使示例工作,我只需要在RecursivelyListAllThatWork方法上实现runnable)。 这是java代码:

 import java.awt.Point; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class main { public static void main(String[] args) { System.out.println("starting.."); int target_sum = 100; int[] data = new int[] { 10, 5, 50, 20, 25, 40 }; List T = tableGeneator(target_sum, data); List coeff = create_coeff(data.length); RecursivelyListAllThatWork(data.length, target_sum, T, coeff, data); } private static List create_coeff(int i) { // TODO Auto-generated method stub Integer[] integers = new Integer[i]; Arrays.fill(integers, 0); List integerList = Arrays.asList(integers); return integerList; } private static void RecursivelyListAllThatWork(int k, int sum, List T, List coeff, int[] data) { // TODO Auto-generated method stub if (k == 0) { //# print what we have so far for (int i = 0; i < coeff.size(); i++) { System.out.println(data[i] + " = " + coeff.get(i)); } System.out.println("*******************"); return; } Integer x_k = data[k-1]; // Recursive step: Try all coefficients, but only if they work. for (int c = 0; c <= sum/x_k; c++) { //the c variable caps the percent if (T.contains(new Point((sum - c * x_k), (k-1)))) { // mark the coefficient of x_k to be c coeff.set((k-1), c); RecursivelyListAllThatWork((k - 1), (sum - c * x_k), T, coeff, data); // unmark the coefficient of x_k coeff.set((k-1), 0); } } } public static List tableGeneator(int target_sum, int[] data) { List T = new ArrayList(); T.add(new Point(0, 0)); float max_percent = 1; int R = (int) (target_sum * max_percent * data.length); for (int i = 0; i < data.length; i++) { for (int s = -R; s < R + 1; s++) { int max_value = (int) Math.abs((target_sum * max_percent) / data[i]); for (int c = 0; c < max_value + 1; c++) { if (T.contains(new Point(s - c * data[i], i))) { Point p = new Point(s, i + 1); if (!T.contains(p)) { T.add(p); } } } } } return T; } } 

multithreading的一般答案是通过堆栈(LIFO或FIFO)去递归递归实现。 当实现这样的算法时,线程的数量是算法的固定参数(例如,核的数量)。

为了实现它,当测试条件结束递归时,语言调用堆栈被存储最后一个上下文作为检查点的堆栈替换。 在您的情况下,它是k=0coeff值匹配目标总和。

在去再生之后,第一个实现是运行多个线程来使用堆栈但是堆栈访问成为争用点,因为它可能需要同步。

更好的可扩展解决方案是为每个线程专用堆栈,但是需要在堆栈中初始生成上下文。

我提出了一种混合方法,其中第一个线程以递归方式处理有限数量的k作为最大递归深度:对于示例中的小数据集为2,但我推荐3如果更大。 然后,第一部分将生成的中间上下文委托给线程池,该线程池将使用非递归实现处理剩余的k 。 此代码不是基于您使用的复杂算法,而是基于相当“基本”的实现:

 import java.util.Arrays; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class MixedParallel { // pre-requisite: sorted values !! private static final int[] data = new int[] { 5, 10, 20, 25, 40, 50 }; // Context to store intermediate computation or a solution static class Context { int k; int sum; int[] coeff; Context(int k, int sum, int[] coeff) { this.k = k; this.sum = sum; this.coeff = coeff; } } // Thread pool for parallel execution private static ExecutorService executor; // Queue to collect solutions private static Queue solutions; static { final int numberOfThreads = 2; executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 1000, TimeUnit.SECONDS, new LinkedBlockingDeque()); // concurrent because of multi-threaded insertions solutions = new ConcurrentLinkedQueue(); } public static void main(String[] args) { int target_sum = 100; // result vector, init to 0 int[] coeff = new int[data.length]; Arrays.fill(coeff, 0); mixedPartialSum(data.length - 1, target_sum, coeff); executor.shutdown(); // System.out.println("Over. Dumping results"); while(!solutions.isEmpty()) { Context s = solutions.poll(); printResult(s.coeff); } } private static void printResult(int[] coeff) { StringBuffer sb = new StringBuffer(); for (int i = coeff.length - 1; i >= 0; i--) { if (coeff[i] > 0) { sb.append(data[i]).append(" * ").append(coeff[i]).append(" "); } } System.out.println(sb.append("from ").append(Thread.currentThread())); } private static void mixedPartialSum(int k, int sum, int[] coeff) { int x_k = data[k]; for (int c = sum / x_k; c >= 0; c--) { coeff[k] = c; int[] newcoeff = Arrays.copyOf(coeff, coeff.length); if (c * x_k == sum) { //printResult(newcoeff); solutions.add(new Context(0, 0, newcoeff)); continue; } else if (k > 0) { if (data.length - k < 2) { mixedPartialSum(k - 1, sum - c * x_k, newcoeff); // for loop on "c" goes on with previous coeff content } else { // no longer recursive. delegate to thread pool executor.submit(new ComputePartialSum(new Context(k - 1, sum - c * x_k, newcoeff))); } } } } static class ComputePartialSum implements Callable { // queue with contexts to process private Queue contexts; ComputePartialSum(Context request) { contexts = new ArrayDeque(); contexts.add(request); } public Void call() { while(!contexts.isEmpty()) { Context current = contexts.poll(); int x_k = data[current.k]; for (int c = current.sum / x_k; c >= 0; c--) { current.coeff[current.k] = c; int[] newcoeff = Arrays.copyOf(current.coeff, current.coeff.length); if (c * x_k == current.sum) { //printResult(newcoeff); solutions.add(new Context(0, 0, newcoeff)); continue; } else if (current.k > 0) { contexts.add(new Context(current.k - 1, current.sum - c * x_k, newcoeff)); } } } return null; } } } 

您可以检查哪个线程找到了输出结果并检查所有被调用的内容:递归模式下的主线程和上下文堆栈模式下池中的两个线程。

现在,当data.length很高时,这个实现是可扩展的:

  • 最大递归深度限于低级别的主线程
  • 池中的每个线程都使用自己的上下文堆栈,而不会与其他线程争用
  • 现在调整的参数是numberOfThreadsmaxRecursionDepth

所以答案是肯定的,你的算法可以并行化。 这是一个基于您的代码的完全递归实现:

 import java.awt.Point; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class OriginalParallel { static final int numberOfThreads = 2; static final int maxRecursionDepth = 3; public static void main(String[] args) { int target_sum = 100; int[] data = new int[] { 50, 40, 25, 20, 10, 5 }; List T = tableGeneator(target_sum, data); int[] coeff = new int[data.length]; Arrays.fill(coeff, 0); RecursivelyListAllThatWork(data.length, target_sum, T, coeff, data); executor.shutdown(); } private static void printResult(int[] coeff, int[] data) { StringBuffer sb = new StringBuffer(); for (int i = coeff.length - 1; i >= 0; i--) { if (coeff[i] > 0) { sb.append(data[i]).append(" * ").append(coeff[i]).append(" "); } } System.out.println(sb.append("from ").append(Thread.currentThread())); } // Thread pool for parallel execution private static ExecutorService executor; static { executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 1000, TimeUnit.SECONDS, new LinkedBlockingDeque()); } private static void RecursivelyListAllThatWork(int k, int sum, List T, int[] coeff, int[] data) { if (k == 0) { printResult(coeff, data); return; } Integer x_k = data[k-1]; // Recursive step: Try all coefficients, but only if they work. for (int c = 0; c <= sum/x_k; c++) { //the c variable caps the percent if (T.contains(new Point((sum - c * x_k), (k-1)))) { // mark the coefficient of x_k to be c coeff[k-1] = c; if (data.length - k != maxRecursionDepth) { RecursivelyListAllThatWork((k - 1), (sum - c * x_k), T, coeff, data); } else { // delegate to thread pool when reaching depth 3 int[] newcoeff = Arrays.copyOf(coeff, coeff.length); executor.submit(new RecursiveThread(k - 1, sum - c * x_k, T, newcoeff, data)); } // unmark the coefficient of x_k coeff[k-1] = 0; } } } static class RecursiveThread implements Callable { int k; int sum; int[] coeff; int[] data; List T; RecursiveThread(int k, int sum, List T, int[] coeff, int[] data) { this.k = k; this.sum = sum; this.T = T; this.coeff = coeff; this.data = data; System.out.println("New job for k=" + k); } public Void call() { RecursivelyListAllThatWork(k, sum, T, coeff, data); return null; } } public static List tableGeneator(int target_sum, int[] data) { List T = new ArrayList(); T.add(new Point(0, 0)); float max_percent = 1; int R = (int) (target_sum * max_percent * data.length); for (int i = 0; i < data.length; i++) { for (int s = -R; s < R + 1; s++) { int max_value = (int) Math.abs((target_sum * max_percent) / data[i]); for (int c = 0; c < max_value + 1; c++) { if (T.contains(new Point(s - c * data[i], i))) { Point p = new Point(s, i + 1); if (!T.contains(p)) { T.add(p); } } } } } return T; } } 

1)而不是

 if k == 0: print what we have so far return 

你可以查看有多少系数是非零的; 如果该计数大于某个阈值(在您的示例中为3),则只是不打印它。 (提示:这与…密切相关

mark the coefficient of x_k to be c

线。)

2)递归函数本质上通常是指数函数,这意味着当你扩展得更高时,运行时会急剧增大。

考虑到这一点,您可以应用multithreading来计算表和递归函数。

在考虑表格时,请考虑循环的哪些部分相互影响,并且必须按顺序进行; 当然,相反的是,找到哪些部分不会相互影响,并且可以并行运行。

至于递归函数,最好的选择可能是将multithreading应用于分支部分。

它们制作这个multithreading的关键只是为了确保你没有不必要的全局数据结构,比如系数上的“标记”。

假设你的表中有K个数字n [0] … n [K-1],你想要达到的总和是S.我假设数组n []从最小到最大的数字排序。

这里有一个简单的枚举算法。 i是数字列表的索引,s是已经构建的当前总和,cs是数字0的系数列表.i – 1:

 function enumerate(i, s, cs): if (s == S): output_solution(cs) else if (i == K): return // dead end else if ((S - s) < n[i]): return // no solution can be found else: for c in 0 .. floor((S - s) / n[i]): // note: floor(...) > 0 enumerate(i + 1, s + c * n[i], append(cs, c)) 

要运行该过程:

  enumerate(0, 0, make_empty_list()) 

现在这里没有全局数据结构了,除了表n [](常量数据),’enumerate’也没有返回任何内容,因此您可以随意更改递归调用以在其自己的线程中运行。 例如,你可以将一个新线程生成一个递归的enumerate()调用,除非你已经运行了太多的线程,在这种情况下你等待。