如何在Java Fork / Join框架中显示工作窃取?

我想改进我的fork / join小例子,以表明在Java Fork / Join框架执行工作中发生窃取。

我需要对代码进行哪些更改? 示例目的:只需对多个线程之间的值分解进行线性研究。

package com.stackoverflow.questions; import java.util.LinkedList; import java.util.List; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class CounterFJ<T extends Comparable> extends RecursiveTask { private static final long serialVersionUID = 5075739389907066763L; private List _list; private T _test; private int _lastCount = -1; private int _start; private int _end; private int _divideFactor = 4; private static final int THRESHOLD = 20; public CounterFJ(List list, T test, int start, int end, int factor) { _list = list; _test = test; _start = start; _end = end; _divideFactor = factor; } public CounterFJ(List list, T test, int factor) { this(list, test, 0, list.size(), factor); } @Override protected Integer compute() { if (_end - _start < THRESHOLD) { int count = 0; for (int i = _start; i < _end; i++) { if (_list.get(i).compareTo(_test) == 0) { count++; } } _lastCount = count; return new Integer(count); } LinkedList<CounterFJ> taskList = new LinkedList(); int step = (_end - _start) / _divideFactor; for (int j = 0; j < _divideFactor; j++) { CounterFJ task = null; if (j == 0) task = new CounterFJ(_list, _test, _start, _start + step, _divideFactor); else if (j == _divideFactor - 1) task = new CounterFJ(_list, _test, _start + (step * j), _end, _divideFactor); else task = new CounterFJ(_list, _test, _start + (step * j), _start + (step * (j + 1)), _divideFactor); // task.fork(); taskList.add(task); } invokeAll(taskList); _lastCount = 0; for (CounterFJ task : taskList) { _lastCount += task.join(); } return new Integer(_lastCount); } public int getResult() { return _lastCount; } public static void main(String[] args) { LinkedList list = new LinkedList(); long range = 200; Random r = new Random(42); for (int i = 0; i < 1000; i++) { list.add(new Long((long) (r.nextDouble() * range))); } CounterFJ counter = new CounterFJ(list, new Long(100), 4); ForkJoinPool pool = new ForkJoinPool(); long time = System.currentTimeMillis(); pool.invoke(counter); System.out.println("Fork join counter in " + (System.currentTimeMillis() - time)); System.out.println("Occurrences:" + counter.getResult()); } } 

最后,我管理了如何做到这一点并不难,所以我把它留给未来的读者。

RecursiveTask保存线程的costructor中创建实例本身。 在compute方法中检查执行的线程是否相同。 如果不是偷工作的话。

所以我添加了这个成员变量

 private long _threadId = -1; private static int stolen_tasks = 0; 

改变了这样的构造函数:

 public CounterFJ(List list, T test, int start, int end, int factor) { _list = list; _threadId = Thread.currentThread().getId(); //added _test = test; _start = start; _end = end; _branchFactor = factor; } 

并在compute方法中添加了比较:

  @Override protected Integer compute() { long thisThreadId = Thread.currentThread().getId(); if (_threadId != thisThreadId){ stolen_tasks++; } // rest of the method