具有不同返回类型的可变callables

我手头的问题是我的方法one(), two(), three(), four()有不同的返回类型,比如A, B, C, D和我需要产生可变数量的线程(每个方法一个,具体取决于用例。这意味着我希望一次调用一个方法的子集。)现在,我使用cachedThreadPool来提交这些callables。 下面的一些代码:

 public class Dispatcher { public void dispatch(List methodNames) { //Now I am going to iterate through the list of methodNames //And submit each method to the `ExecutorService` for(MethodNames m : methodNames) { switch(m) { case ONE: //submit one() //wait and get the future Future future = cachePool.submit(new Callable() { @Override public A call() { return one(); }); A response = future.get(); break; .... } } } } public enum MethodNames { ONE, TWO, THREE, FOUR } //Example methods: public A one() { } public B two() { } 

我的问题是上面是如何进行所有方法调用而不必等待一个完成。 另外,我如何收集所有futures并等待它们完成因为所有期货都有不同的generics类型Future, Future等我在case语句中调用submit()所以我不要在案例之外无权访问返回的Future 。 现在我可以做一个if, else而不是for循环,但我想弄清楚是否有更好的方法来实现这一点。

我会这样做 –

  • 创建一个界面,让我们说I
  • ABCD实现I
  • 使用枚举valueOf和对象overriding来删除case语句。
  • 使用多态并从所有方法返回I
  • 下面是代码(不包括ABCDI ),因为它们是普通类和接口 – 没有做太多。

以下是代码:

Dispatcher.java

 package com.test.thread; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; public class Dispatcher { public void dispatch() throws InterruptedException, ExecutionException { Map> reponse = new HashMap>(); ExecutorService cachePool = Executors.newCachedThreadPool(); for (MethodNames methodNames : MethodNames.values()) { Future future = cachePool.submit(methodNames.worker()); reponse.put(methodNames, future); } cachePool.awaitTermination(5, TimeUnit.MINUTES); for(MethodNames key : reponse.keySet()) { I result = reponse.get(key).get(); System.out.println("result :: " + result); } } public static void main(String[] args) throws InterruptedException, ExecutionException { new Dispatcher().dispatch(); } 

}

MethodNames.java

 package com.test.thread; import java.util.concurrent.*; public enum MethodNames { ONE { @Override public Callable worker() { return new Callable() { @Override public I call() throws InterruptedException { System.out.println("Thread1"); TimeUnit.SECONDS.sleep(30); return new A(); }}; } }, TWO { @Override public Callable worker() throws InterruptedException { return new Callable() { @Override public I call() throws InterruptedException { System.out.println("Thread2"); TimeUnit.SECONDS.sleep(30); return new B(); }}; } }, THREE { @Override public Callable worker() throws InterruptedException { return new Callable() { @Override public I call() throws InterruptedException { System.out.println("Thread3"); TimeUnit.SECONDS.sleep(30); return new C(); }}; } }, FOUR { @Override public Callable worker() throws InterruptedException { return new Callable() { @Override public I call() throws InterruptedException { System.out.println("Thread"); TimeUnit.SECONDS.sleep(30); return new D(); }}; } }; public abstract Callable worker() throws InterruptedException; 

}

最好将get与未来分开,因此将一个Callable作为参数添加到枚举中。 然后enum瞬间可以创建一个Future。 不幸的是,对于通用类型,需要存储生成的类,并用于正确键入。

 public enum MethodNames { ONE(A.class, () -> { one() }), TWO(B.class, () -> { two() }), ... FOUR(D.class, () -> { four() }); private final Class resultType; private final Future future; private  MethodNames(Class resultType, Callable callable) { this.resultType = resultType; future = cachePool.submit(callable); } public  T getResponse(Class type) { Object response = future.get(); return resultType.asSubclass(type).cast(response); } } 

如果这些是您提交给ExecutorService的唯一Callables ,那么您可以在提交作业后调用cachePool上的cachePool (可以是Runnable而不是Callable

 public class Dispatcher { public void dispatch(List methodNames) { for(MethodNames m : methodNames) { switch(m) { case ONE: //submit one() cachePool.execute(new Runnable() { @Override public void run() { // do work }); break; .... } } } cachePool.awaitTermination(100, TimeUnit.HOURS); } 

如果cachePool有其他不相关的任务或由于某些其他原因你不能使用awaitTermination那么你可以阻塞信号量 。 使用零许可初始化Semaphore ,每个任务在完成时将release许可,并且dispatch方法在semaphore.acquire(methodNames.size())semaphore.acquire(methodNames.size()) ,等待所有任务调用release (并因此完成)。 注意Runnabletry-finally块,否则如果Runnable抛出exception,那么它将不会调用release并且dispatch方法将永远阻塞。

 public class Dispatcher { public void dispatch(List methodNames) { Semaphore semaphore = new Semaphore(0); for(MethodNames m : methodNames) { switch(m) { case ONE: //submit one() cachePool.execute(new Runnable() { @Override public void run() { try { // do work } finally { semaphore.release(); } }); break; .... } } } semaphore.acquire(methodNames.size()); } 

如果你正在收集任务的结果(看起来你现在看起来不像这样,但需求往往会改变),那么每个Runnable都可以将其结果存储在共享的ConcurrentLinkedQueue或其他一些线程安全的数据中结构(或每个返回类型的一个数据结构等),然后当semaphore.acquireawaitTermination方法解除awaitTermination时, dispatch可以处理这些结果。

这些成分

您需要以多个步骤处理结果:

等待多个期货

在这种情况下,您可以在签名中使用Future ,因为只是等待您不必知道结果类型。 所以你可以创建一个方法void waitForAll(List< Future > futures)

以安全的方式从未知的未来获得结果

为此,您需要某种知道Future将提供的类型的句柄。 由于Java的类型擦除,这个句柄必须以某种方式存储Class 。 所以最简单的句柄就是相应的Future本身(在你的例子中TAB之一)。

因此,您可以将期货存储在Map, Future) (或MultiMap )中,并使用Future get(Class handle)类型的附加get方法。

您可以使用此句柄替换enum MethodNames

收据:将成分与解决方案相结合

  1. 创建如上定义的Map / MultiMap ,例如通过取消对dispatch方法的多次调用。
  2. 将waitAll与地图的值列表一起使用
  3. 使用上述get方法从Map / MultiMap获取相应的结果

你正在尝试做一些像fork-join或map-reduce之类的东西。 您可能会找到一种既定机制来完成此任务,而不是重新发明轮子。

无论如何回到你等待所有方法完成和继续前进的具体问题

正如你所提到的,你不应该失去指向未来的指针。 因此,创建一个结构Result ,它可以容纳所有未来。 等待当前线程中的Result 。 在Result中运行另一个线程,它将监视期货并在返回所有方法时通知

Result通知时,您将在当前线程中向前移动,并返回Result对象保存的所有返回数据。

简单(有限)解决方案 :如果您可以为返回值定义接口/超类(类似于sql.ResultSet ),这非常有用,否则没有那么多…然后在处理结果时再次出现切换因为你必须施展它。

调度:

 dispatcher.dispatch(new SuperABC[] {new A(), new B(), new C()}); // dispatcher.dispatch(new A(), new B(), new C()); with ... notation, see comment below 

接口:

 public interface Result { ... } public interface SuperABC extends Callable {} 

类示例:

 public class A implements SuperABC { public Result call() throws Exception { ... } } 

调度方式:

 public Result[] dispatch(SuperABC[] tasks) { // dispatch(SuperABC... tasks) List> refs = new ArrayList>(tasks.length); Result[] ret = new Result[tasks.length]; for (SuperABC task : tasks) refs.add(cachedThreadPool.submit(task)); for (int i = 0; i < tasks.length; i++) ret[i] = refs.get(i).get(); return ret; }