具有不同返回类型的可变callables
我手头的问题是我的方法one(), two(), three(), four()
有不同的返回类型,比如A, B, C, D
和我需要产生可变数量的线程(每个方法一个,具体取决于用例。这意味着我希望一次调用一个方法的子集。)现在,我使用cachedThreadPool
来提交这些callables。 下面的一些代码:
public class Dispatcher { public void dispatch(List methodNames) { //Now I am going to iterate through the list of methodNames //And submit each method to the `ExecutorService` for(MethodNames m : methodNames) { switch(m) { case ONE: //submit one() //wait and get the future Future future = cachePool.submit(new Callable() { @Override public A call() { return one(); }); A response = future.get(); break; .... } } } } public enum MethodNames { ONE, TWO, THREE, FOUR } //Example methods: public A one() { } public B two() { }
我的问题是上面是如何进行所有方法调用而不必等待一个完成。 另外,我如何收集所有futures
并等待它们完成因为所有期货都有不同的generics类型Future, Future
等我在case语句中调用submit()
所以我不要在案例之外无权访问返回的Future
。 现在我可以做一个if, else
而不是for
循环,但我想弄清楚是否有更好的方法来实现这一点。
我会这样做 –
- 创建一个界面,让我们说
I
。 - 让
A
,B
,C
和D
实现I
。 - 使用枚举
valueOf
和对象overriding
来删除case语句。 - 使用多态并从所有方法返回
I
- 下面是代码(不包括
A
,B
,C
,D
,I
),因为它们是普通类和接口 – 没有做太多。
以下是代码:
Dispatcher.java
package com.test.thread; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; public class Dispatcher { public void dispatch() throws InterruptedException, ExecutionException { Map> reponse = new HashMap>(); ExecutorService cachePool = Executors.newCachedThreadPool(); for (MethodNames methodNames : MethodNames.values()) { Future future = cachePool.submit(methodNames.worker()); reponse.put(methodNames, future); } cachePool.awaitTermination(5, TimeUnit.MINUTES); for(MethodNames key : reponse.keySet()) { I result = reponse.get(key).get(); System.out.println("result :: " + result); } } public static void main(String[] args) throws InterruptedException, ExecutionException { new Dispatcher().dispatch(); }
}
MethodNames.java
package com.test.thread; import java.util.concurrent.*; public enum MethodNames { ONE { @Override public Callable worker() { return new Callable() { @Override public I call() throws InterruptedException { System.out.println("Thread1"); TimeUnit.SECONDS.sleep(30); return new A(); }}; } }, TWO { @Override public Callable worker() throws InterruptedException { return new Callable() { @Override public I call() throws InterruptedException { System.out.println("Thread2"); TimeUnit.SECONDS.sleep(30); return new B(); }}; } }, THREE { @Override public Callable worker() throws InterruptedException { return new Callable() { @Override public I call() throws InterruptedException { System.out.println("Thread3"); TimeUnit.SECONDS.sleep(30); return new C(); }}; } }, FOUR { @Override public Callable worker() throws InterruptedException { return new Callable() { @Override public I call() throws InterruptedException { System.out.println("Thread"); TimeUnit.SECONDS.sleep(30); return new D(); }}; } }; public abstract Callable worker() throws InterruptedException;
}
最好将get与未来分开,因此将一个Callable作为参数添加到枚举中。 然后enum瞬间可以创建一个Future。 不幸的是,对于通用类型,需要存储生成的类,并用于正确键入。
public enum MethodNames { ONE(A.class, () -> { one() }), TWO(B.class, () -> { two() }), ... FOUR(D.class, () -> { four() }); private final Class> resultType; private final Future> future; private MethodNames(Class resultType, Callable callable) { this.resultType = resultType; future = cachePool.submit(callable); } public T getResponse(Class type) { Object response = future.get(); return resultType.asSubclass(type).cast(response); } }
如果这些是您提交给ExecutorService
的唯一Callables
,那么您可以在提交作业后调用cachePool上的cachePool
(可以是Runnable
而不是Callable
)
public class Dispatcher { public void dispatch(List methodNames) { for(MethodNames m : methodNames) { switch(m) { case ONE: //submit one() cachePool.execute(new Runnable() { @Override public void run() { // do work }); break; .... } } } cachePool.awaitTermination(100, TimeUnit.HOURS); }
如果cachePool
有其他不相关的任务或由于某些其他原因你不能使用awaitTermination
那么你可以阻塞信号量 。 使用零许可初始化Semaphore
,每个任务在完成时将release
许可,并且dispatch
方法在semaphore.acquire(methodNames.size())
上semaphore.acquire(methodNames.size())
,等待所有任务调用release
(并因此完成)。 注意Runnable
的try-finally
块,否则如果Runnable
抛出exception,那么它将不会调用release
并且dispatch
方法将永远阻塞。
public class Dispatcher { public void dispatch(List methodNames) { Semaphore semaphore = new Semaphore(0); for(MethodNames m : methodNames) { switch(m) { case ONE: //submit one() cachePool.execute(new Runnable() { @Override public void run() { try { // do work } finally { semaphore.release(); } }); break; .... } } } semaphore.acquire(methodNames.size()); }
如果你正在收集任务的结果(看起来你现在看起来不像这样,但需求往往会改变),那么每个Runnable
都可以将其结果存储在共享的ConcurrentLinkedQueue或其他一些线程安全的数据中结构(或每个返回类型的一个数据结构等),然后当semaphore.acquire
或awaitTermination
方法解除awaitTermination
时, dispatch
可以处理这些结果。
这些成分
您需要以多个步骤处理结果:
等待多个期货
在这种情况下,您可以在签名中使用Future>
,因为只是等待您不必知道结果类型。 所以你可以创建一个方法void waitForAll(List< Future> > futures)
。
以安全的方式从未知的未来获得结果
为此,您需要某种知道Future
将提供的类型的句柄。 由于Java的类型擦除,这个句柄必须以某种方式存储Class
。 所以最简单的句柄就是相应的Future
本身(在你的例子中T
是A
和B
之一)。
因此,您可以将期货存储在Map
(或MultiMap
)中,并使用Future
类型的附加get方法。
您可以使用此句柄替换enum MethodNames
。
收据:将成分与解决方案相结合
- 创建如上定义的
Map
/MultiMap
,例如通过取消对dispatch
方法的多次调用。 - 将waitAll与地图的值列表一起使用
- 使用上述get方法从
Map
/MultiMap
获取相应的结果
你正在尝试做一些像fork-join或map-reduce之类的东西。 您可能会找到一种既定机制来完成此任务,而不是重新发明轮子。
无论如何回到你等待所有方法完成和继续前进的具体问题:
正如你所提到的,你不应该失去指向未来的指针。 因此,创建一个结构Result ,它可以容纳所有未来。 等待当前线程中的Result 。 在Result中运行另一个线程,它将监视期货并在返回所有方法时通知 。
当Result通知时,您将在当前线程中向前移动,并返回Result对象保存的所有返回数据。
简单(有限)解决方案 :如果您可以为返回值定义接口/超类(类似于sql.ResultSet
),这非常有用,否则没有那么多…然后在处理结果时再次出现切换因为你必须施展它。
调度:
dispatcher.dispatch(new SuperABC[] {new A(), new B(), new C()}); // dispatcher.dispatch(new A(), new B(), new C()); with ... notation, see comment below
接口:
public interface Result { ... } public interface SuperABC extends Callable {}
类示例:
public class A implements SuperABC { public Result call() throws Exception { ... } }
调度方式:
public Result[] dispatch(SuperABC[] tasks) { // dispatch(SuperABC... tasks) List> refs = new ArrayList>(tasks.length); Result[] ret = new Result[tasks.length]; for (SuperABC task : tasks) refs.add(cachedThreadPool.submit(task)); for (int i = 0; i < tasks.length; i++) ret[i] = refs.get(i).get(); return ret; }