从Java线程返回值

我有一个类似以下的Java线程:

public class MyThread extends Thread { MyService service; String id; public MyThread(String id) { this.id = node; } public void run() { User user = service.getUser(id) } } 

我有大约300个ID,每隔几秒钟 – 我启动线程来调用每个id。 例如。

 for(String id: ids) { MyThread thread = new MyThread(id); thread.start(); } 

现在,我想从每个线程收集结果,并对数据库进行批量插入,而不是每2秒进行300次数据库插入。

知道我怎么能做到这一点?

如果要在执行数据库更新之前收集所有结果,可以使用invokeAll方法。 如果您一次提交一个任务,这就照顾了所需的簿记,如daveb建议的那样。

 private static final ExecutorService workers = Executors.newCachedThreadPool(); ... Collection> tasks = new ArrayList>(); for (final String id : ids) { tasks.add(new Callable() { public User call() throws Exception { return svc.getUser(id); } }); } /* invokeAll blocks until all service requests complete, * or a max of 10 seconds. */ List> results = workers.invokeAll(tasks, 10, TimeUnit.SECONDS); for (Future f : results) { User user = f.get(); /* Add user to batch update. */ ... } /* Commit batch. */ ... 

规范方法是使用CallableExecutorServicesubmit一个Callable submitExecutorService返回一个(类型安全的) Future ,您可以get结果。

 class TaskAsCallable implements Callable { @Override public Result call() { return a new Result() // this is where the work is done. } } ExecutorService executor = Executors.newFixedThreadPool(300); Future task = executor.submit(new TaskAsCallable()); Result result = task.get(); // this blocks until result is ready 

在您的情况下,您可能希望使用invokeAll返回Futures List ,或者在向执行程序添加任务时自己创建该列表。 要收集结果,只需在每个上面调用get

将结果存储在对象中。 完成后,让它自己放入同步集合(想到一个同步队列)。

如果您希望收集要提交的结果,请从队列中获取所有内容并从对象中读取结果。 你甚至可能让每个对象知道如何将它自己的结果“发布”到数据库中,这样就可以提交不同的类,并且所有类都使用完全相同的小巧,优雅的循环来处理。

JDK中有很多工具可以帮助解决这个问题,但是一旦你开始认为你的线程是一个真正的对象而不仅仅是一堆围绕“运行”方法的废话,它就非常容易。 一旦你开始考虑对象,编程变得更简单,更令人满意。

在Java8中,使用CompletableFuture有更好的方法。 假设我们有从数据库获取id的类,为简单起见,我们可以返回如下的数字,

 static class GenerateNumber implements Supplier{ private final int number; GenerateNumber(int number){ this.number = number; } @Override public Integer get() { try { TimeUnit.SECONDS.sleep(1); }catch (InterruptedException e){ e.printStackTrace(); } return this.number; } } 

现在,一旦每个未来的结果准备就绪,我们就可以将结果添加到并发集合中。

 Collection results = new ConcurrentLinkedQueue<>(); int tasks = 10; CompletableFuture[] allFutures = new CompletableFuture[tasks]; for (int i = 0; i < tasks; i++) { int temp = i; CompletableFuture future = CompletableFuture.supplyAsync(()-> new GenerateNumber(temp).get(), executor); allFutures[i] = future.thenAccept(results::add); } 

现在我们可以在所有期货准备就绪时添加回调,

 CompletableFuture.allOf(allFutures).thenAccept(c->{ System.out.println(results); // do something with result }); 

您可以创建一个队列或列表,将其传递给您创建的线程,线程将其结果添加到列表中,该列表由执行批量插入的使用者清空。

最简单的方法是将一个对象传递给每个线程(每个线程一个对象),以后会包含结果。 主线程应该保持对每个结果对象的引用。 连接所有线程后,您可以使用结果。

 public class TopClass { List users = new ArrayList(); void addUser(User user) { synchronized(users) { users.add(user); } } void store() throws SQLException { //storing code goes here } class MyThread extends Thread { MyService service; String id; public MyThread(String id) { this.id = node; } public void run() { User user = service.getUser(id) addUser(user); } } } 

你需要将结果存储在像singleton这样的东西中。 这必须正确同步。

编辑 :我知道这不是最好的建议,因为处理原始Threads并不是一个好主意。 但鉴于这个问题会起作用,不是吗? 我可能不会被投票,但为什么要投票呢?

你可以创建一个扩展Observable的类。 然后,您的线程可以调用Observable类中的方法,该方法将通过调用Observable.notifyObservers(Object)来通知在该观察者中注册的任何类。

观察类将实现Observer,并使用Observable注册自己。 然后,您将实现一个更新(Observable,Object)方法,该方法在调用Observerable.notifyObservers(Object)时被调用。