单线程处理任务而不排队进一步的请求

我要求异步执行任务,同时丢弃任何进一步的请求,直到任务完成。

同步方法只是将任务排队,不会跳过。 我最初想过使用SingleThreadExecutor,但也会排队任务。 然后我查看了ThreadPoolExecutor,但是它读取队列以获取要执行的任务,因此将执行一个任务并且至少有一个任务排队(其他任务可以使用ThreadPoolExecutor.DiscardPolicy丢弃)。

我唯一能想到的是使用信号量来阻止队列。 我来自以下示例来展示我正在努力实现的目标。 有更简单的方法吗? 我错过了一些明显的东西吗

import java.util.concurrent.*; public class ThreadPoolTester { private static ExecutorService executor = Executors.newSingleThreadExecutor(); private static Semaphore processEntry = new Semaphore(1); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 20; i++) { kickOffEntry(i); Thread.sleep(200); } executor.shutdown(); } private static void kickOffEntry(final int index) { if (!processEntry.tryAcquire()) return; executor. submit( new Callable() { public Void call() throws InterruptedException { try { System.out.println("start " + index); Thread.sleep(1000); // pretend to do work System.out.println("stop " + index); return null; } finally { processEntry.release(); } } } ); } } 

样本输出

 start 0 stop 0 start 5 stop 5 start 10 stop 10 start 15 stop 15 

采取axtavt的答案并转换上面的例子给出了以下更简单的解决方案。

 import java.util.concurrent.*; public class SyncQueueTester { private static ExecutorService executor = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadPoolExecutor.DiscardPolicy()); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 20; i++) { kickOffEntry(i); Thread.sleep(200); } executor.shutdown(); } private static void kickOffEntry(final int index) { executor. submit( new Callable() { public Void call() throws InterruptedException { System.out.println("start " + index); Thread.sleep(1000); // pretend to do work System.out.println("stop " + index); return null; } } ); } } 

看起来像SynchronousQueue支持的执行程序,所需的策略可以满足您的需求:

 executor = new ThreadPoolExecutor( 1, 1, 1000, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadPoolExecutor.DiscardPolicy()); 

如果没有队列,我就不需要遗嘱执行人了。 单独使用信号量似乎就足够了。 我正在使用下面的代码,以避免在它已经运行时运行相同的代码。 只是确保semaphorestatic volatile ,这使得信号量成为类的唯一信号量,并在信号量改变后立即将信号量引用传播给其他线程的堆

 if (this.getSemaphore().tryAcquire()) { try { process(); } catch (Exception e) { } finally { this.getSemaphore().release(); } } else { logger.info(">>>>> Job already running, skipping go"); }