Java:当线程池中的所有线程都完成时,通知主类/不同线程中对象的相同实例

ThreadPoolExecutor所有线程都完成后,如何通知我的主类实例化ThreadPoolExecutor

 ThreadPoolExecutor threadPool = null; ThreadClass threadclass1; ThreadClass threadclass2; final ArrayBlockingQueue queue = new ArrayBlockingQueue(maxPoolSize); puclic MyClass(){ threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, queue); threadClass1 = new ThreadClass; threadClass2 = new ThreadClass; threadPool.execute(threadClass1); threadPool.execute(threadClass2); //Now I would like to do something until the threadPool is done working //The threads fill a ConcurrentLinkedQueueand I would like to poll //the queue as it gets filled by the threads and output //it to XML via JAX-RS } 

编辑1

让我的线程从某个地方获取数据并将这些信息填充到ConcurrentLinkedQueue中我基本上想在MyClass中执行一些操作来用结果更新XML输出。 当所有线程都被终止时,我想返回真实的JAX-RS webservice,它实例化了MyClass,因此webservice知道已经获取了所有数据,现在它可以显示最终的XML文件

编辑2

我将Queue传递给线程,以便他们可以将项添加到队列中。 当一个driver完成向articleQueue添加项目时,我想在我的主类中执行一个操作,从Queue轮询实体并将其交给response对象以某种方式显示它。

当我将队列传递给线程时,它们是使用相同的对象还是使用对象的“副本”,以便线程内的更改不会影响主对象? 那不是我想要的行为。 当我检查Driver中的articleQueue的大小为18articleQueueDriverController的大小为0

当一个线程在我的while循环之外的队列中添加了一些东西时,有没有更好的方法做出反应? 如何修改我的代码以访问不同类中的同一对象?

DriverController

 public class DriverController { Queue
articleQueue; ThreadPoolExecutor threadPool = null; final ArrayBlockingQueue queue = new ArrayBlockingQueue( maxPoolSize); public DriverController(Response response) { articleQueue = new ConcurrentLinkedQueue
(); threadPool = new ThreadPoolExecutor(); Driver driver = new Driver(this.articleQueue); threadPool.execute(driver); // More drivers would be executed here which add to the queue while (threadPool.getActiveCount() > 0) { // this.articleQueue.size() gives back 0 here ... why? if(articleQueue.size()>0){ response.addArticle(articleQueue.poll()); } } } }

司机

 public class Driver implements Runnable{ private Queue
articleQueue; public DriverAlliedElectronics(Queue articleQueue) { this.articleQueue = articleQueue; } public boolean getData() { // Here would be the code where the article is created ... this.articleQueue.offer(article); return true; } public void run() { this.getData(); // this.articleQueue.size() gives back 18 here ... } }

也许ExecutorCompletionService可能适合您:

http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ExecutorCompletionService.html

上面的链接示例:

 void solve(Executor e, Collection> solvers) throws InterruptedException, ExecutionException { CompletionService ecs = new ExecutorCompletionService(e); for (Callable s : solvers) ecs.submit(s); int n = solvers.size(); for (int i = 0; i < n; ++i) { Result r = ecs.take().get(); if (r != null) use(r); } } 

您应该尝试使用以下代码段

 //Now I would like to wait until the threadPool is done working threadPool.shutdown(); while (!threadPool.isTerminated()) { try { threadPool.awaitTermination(10, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } 

而不是使用execute你应该使用提交 。 这将返回一个Future实例,您可以在该实例上等待任务完成。 这样您就不需要轮询或关闭池。

我认为没有办法明确地做到这一点。 您可以轮询getCompletedTaskCount()以等待它变为零。

为什么不收集提交时返回的Future对象并检查所有正在完成的对象? 只需依次调用每个上的get() 。 由于该呼叫阻止你只是依次等待每个呼叫,然后逐渐通过该设置,直到你等待每个呼叫。

或者,您可以提交线程,并在执行程序上调用shutdown() 。 这样,将执行提交的任务,然后调用terminate()方法。 如果你覆盖了这个,那么一旦完成所有任务就会得到一个回调(显然你不能再使用那个执行器)。

从参考文档来看,您有几个选择:

 ThreadPoolExecutor threadPool = null; ThreadClass threadclass1; ThreadClass threadclass2; final ArrayBlockingQueue queue = new ArrayBlockingQueue(maxPoolSize); puclic MyClass(){ threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, queue); threadClass1 = new ThreadClass; threadClass2 = new ThreadClass; threadPool.execute(threadClass1); threadPool.execute(threadClass2); //Now I would like to wait until the threadPool is done working //Option 1: shutdown() and awaitTermination() threadPool.shutDown(); try { threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS) } catch (InterruptedException e) { e.printStackTrace(); } //Option 2: getActiveCount() while (threadPool.getActiveCount() > 0) { try { Thread.sleep(1000); } catch (InterruptedException ignored) {} } //Option 3: getCompletedTaskCount() while (threadPool.getCompletedTaskCount() < totalNumTasks) { try { Thread.sleep(1000); } catch (InterruptedException ignored) {} } } 

考虑到所有事情,我认为shutdown()awaitTermination()是三者中最好的选择。

我觉得你有点过分工作了。 你并不真正关心线程或线程池,这是正确的。 Java提供了很好的抽象,所以你不必这样做。 您只需知道任务何时完成,并且存在相应的方法。 只需提交你的工作,等待期货说他们已经完成了。 如果您真的想在单个任务完成后立即知道,您可以查看所有期货并在任何一个完成后立即采取行动。 如果没有,你只关心一切都已完成,你可以从我即将发布的代码中删除一些复杂性。 试试这个大小(注意MultithreadedJaxrsResource是可执行的):

 import javax.ws.rs.*; import javax.ws.rs.core.MediaType; import java.util.*; import java.util.concurrent.*; @Path("foo") public class MultithreadedJaxrsResource { private ExecutorService executorService; public MultithreadedJaxrsResource(ExecutorService executorService) { this.executorService = executorService; } @GET @Produces(MediaType.APPLICATION_XML) public AllMyArticles getStuff() { List> futures = new ArrayList
>(); // Submit all the tasks to run for (int i = 0; i < 10; i++) { futures.add(executorService.submit(new Driver(i + 1))); } AllMyArticles articles = new AllMyArticles(); // Wait for all tasks to finish // If you only care that everything is done and not about seeing // when each one finishes, this outer do/while can go away, and // you only need a single for loop to wait on each future. boolean allDone; do { allDone = true; Iterator
> futureIterator = futures.iterator(); while (futureIterator.hasNext()) { Future
future = futureIterator.next(); if (future.isDone()) { try { articles.articles.add(future.get()); futureIterator.remove(); } catch (InterruptedException e) { // thread was interrupted. don't do that. throw new IllegalStateException("broken", e); } catch (ExecutionException e) { // execution of the Callable failed with an // exception. check it out. throw new IllegalStateException("broken", e); } } else { allDone = false; } } } while (!allDone); return articles; } public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(10); AllMyArticles stuff = new MultithreadedJaxrsResource(executorService).getStuff(); System.out.println(stuff.articles); executorService.shutdown(); } } class Driver implements Callable
{ private int i; // Just to differentiate the instances public Driver(int i) { this.i = i; } public Article call() { // Simulate taking some time for each call try { Thread.sleep(1000 / i); } catch (InterruptedException e) { System.err.println("oops"); } return new Article(i); } } class AllMyArticles { public final List
articles = new ArrayList
(); } class Article { public final int i; public Article(int i) { this.i = i; } @Override public String toString() { return "Article{" + "i=" + i + '}'; } }

通过这种方式,你可以清楚地看到任务按照它们完成的顺序返回,因为最后一个任务首先完成,这要归功于最短时间的睡眠。 如果您不关心完成顺序并且只想等待所有完成,则循环变得更加简单:

 for (Future
future : futures) { try { articles.articles.add(future.get()); } catch (InterruptedException e) { // thread was interrupted. don't do that. throw new IllegalStateException("broken", e); } catch (ExecutionException e) { // execution of the Callable failed with an exception. check it out. throw new IllegalStateException("broken", e); } }