ExecutorService,按顺序执行任务但从池中获取线程
我正在尝试构建ExecutorService
的实现,我们称之为SequentialPooledExecutor
,具有以下属性。
-
SequentialPooledExecutor
所有实例共享同一个线程池 -
对
SequentialPooledExecutor
的同一实例的调用按SequentialPooledExecutor
执行。
换句话说,实例在开始处理其队列中的下一个任务之前等待当前正在执行的任务的终止。
我目前正在自己实施SequentialPooledExecutor
,但我想知道我是否正在重新发明轮子。 我查看了ExecutorService
不同实现,例如Executors
类提供的那些,但我找不到符合我要求的实现。
你知道我现有的实现是否缺失,还是我应该继续自己实现界面?
编辑:
我认为我的要求不是很清楚,让我们看看我是否可以用其他的话来解释它。
假设我有一系列会话,比如1000个(我之前称之为执行程序实例的东西)。 我可以向会话提交任务,我希望保证提交给同一会话的所有任务都按顺序执行。 但是,属于不同会话的任务应该彼此没有依赖关系。
我想定义一个执行这些任务的ExecutorService
,但是使用有限数量的线程,比方说200,但确保在同一个会话中的前一个任务完成之前不启动任务。
我不知道是否存在已经存在的任何内容,或者我是否应该自己实现这样的ExecutorService
。
如果ExecutorService
顺序执行任务,只需使用Executors.newSingleThreadExecutor()
创建一个只有一个线程的 Executors.newSingleThreadExecutor()
。
如果您有不同类型的任务,并且您只想按顺序ExecutorService
相同类型的任务 ,则可以对相同类型的任务 使用相同的单线程ExecutorService
, 而无需重新发明轮子。
因此,假设您有1 000
种不同类型的任务,您可以使用200
个单线程ExecutorService
,您需要自己实现的唯一事情是您始终需要对给定类型的任务使用相同的单线程ExecutorService
。
如果您有数千个必须按顺序处理的密钥,但是您没有数千个核心,则可以使用散列策略来分发这样的工作
ExecutorService[] es = // many single threaded executors public Future submit(String key, Callable calls) { int h = Math.abs(key.hashCode() % es.length); return es[h].submit(calls); }
一般来说,只需要2 * N个线程来保持N个核心忙,如果你的任务是CPU绑定的,那么只会增加开销。
@ Nicolas的回答可能是你最好的选择,因为它简单,经过充分测试,效率很高。
但是,如果它不能满足您的要求,我会这样做:
- 不要将“SequentialPooledExecutor”作为执行程序服务,使其成为单线程执行程序服务“池”的外观
- 让你的“SequentialPooledExecutor”实现一个submit方法(使用Runnable / Callable和一个表示“队列名称”的String),返回一个Future,就像一个执行者服务
- 在调用此方法时,通过获取队列名称的哈希值并将其分派给相应的内部执行程序,使“SequentialPooledExecutor”调度到其内部单线程执行程序服务之一。
在步骤3中进行的散列部分允许您将每个“队列名称”的任务始终转到“SequentialPooledExecutor”内的相同(单线程)执行程序服务。
另一种可能的途径是使用CompletionStage
和CompletableFuture
。 实际上,这些是可听的未来(具有完成处理程序)。 有了这些,第一次进行“会话”时,您可以使用第一个任务创建CompletableFuture
,并保持它。 在每个新任务中,您将先前的未来与新任务相结合,调用thenAcceptAsync
(或任何类似的)。 你得到的是一个线性的执行任务链。
private Map> sessionTasks = new HashMap<>(); private ExecutorService pool = Executors.newFixedThreadPool(200); public void submit(int sessionId, Runnable task) { if (sessionTasks.containsKey(sessionId)) { sessionTasks.compute(sessionId, (i, c) -> c.thenRunAsync(task, pool)); } else { sessionTasks.put(sessionId, CompletableFuture.runAsync(task, pool)); } }
如果会话没有任务,则会创建一个新任务并在提供的池中运行。 如果会话在添加新任务时已经有任务,则后者将链接(使用thenRun
)到前一个任务,确保顺序。
如果要配置有界队列,请使用ThreadPoolExecutor
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
对于您的用例,请使用ThreadPoolExecutor
作为
ThreadPoolExecutor executor = ThreadPoolExecutor(1,1,60,TimeUnit.SECONDS,new ArrayBlockingQueue(1000));
上面的代码大小队列大小是ThreadPoolExecutor
为1000.如果要使用自定义拒绝执行处理程序,可以配置RejectedExeutionHandler
。
相关SE问题:
如何正确使用Java Executor?
最近我遇到了同样的问题。 没有内置类,但队列足够接近。 我的简单实现看起来像这样(也许对其他人在同一问题上寻找示例很有帮助)
public class SerializedAsyncRunnerSimple implements Runnable { private final ExecutorService pool; protected final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); //thread safe queue protected final AtomicBoolean active = new AtomicBoolean(false); public SerializedAsyncRunnerSimple(ExecutorService threadPool) {this.pool = threadPool;} public void addWork(Runnable r){ queue.add(r); startExecutionIfInactive(); } private void startExecutionIfInactive() { if(active.compareAndSet(false, true)) { pool.execute(this); } } @Override public synchronized void run() { while(!queue.isEmpty()){ queue.poll().run(); } active.set(false); //all future adds will not be executed on this thread anymore if(!queue.isEmpty()) { //if some items were added to the queue after the last queue.poll startExecutionIfInactive();// trigger an execution on next thread } }
- ExecutorService应该是静态的还是全局的
- 完成所有ExecutorService任务后,程序不会立即终止
- 如何终止multithreading中超时的任务?
- 如何在使用具有线程超时function的ExecutorService时提高性能?
- 如何决定是使用newCachedThreadPool还是newFixedThreadPool?
- Control ExecutorService每秒最多执行N个任务
- java Fork / Join池,ExecutorService和CountDownLatch
- 如何在java中设置Socket写入时间?
- ExecutorService Future ::变得非常慢