ThreadPoolExecutor中的死锁

遇到ThreadPoolExecutor驻留在execute(Runnable)函数中的情况,而所有ThreadPool线程都在getTask func中等待,workQueue为空。

有人有什么想法吗?

ThreadPoolExecutor是使用ArrayBlockingQueue创建的,而corePoolSize == maximumPoolSize = 4

[编辑]更准确地说,线程在ThreadPoolExecutor.exec(Runnable command) func中被阻止。 它有执行任务,但没有执行。

[Edit2]执行程序在工作队列( ArrayBlockingQueue )内的某处被阻塞。

[Edit3] callstack:

 thread = front_end(224) at sun.misc.Unsafe.park(Native methord) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:778) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1114) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) at java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:224) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:653) at net.listenThread.WorkersPool.execute(WorkersPool.java:45) 

同时workQueue为空(使用远程调试检查)

[Edit4]使用ThreadPoolExecutor代码:

 public WorkersPool(int size) { pool = new ThreadPoolExecutor(size, size, IDLE_WORKER_THREAD_TIMEOUT, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_CAPACITY), new ThreadFactory() { @NotNull private final AtomicInteger threadsCount = new AtomicInteger(0); @NotNull public Thread newThread(@NotNull Runnable r) { final Thread thread = new Thread(r); thread.setName("net_worker_" + threadsCount.incrementAndGet()); return thread; } }, new RejectedExecutionHandler() { public void rejectedExecution(@Nullable Runnable r, @Nullable ThreadPoolExecutor executor) { Verify.warning("new task " + r + " is discarded"); } }); } public void execute(@NotNull Runnable task) { pool.execute(task); } public void stopWorkers() throws WorkersTerminationFailedException { pool.shutdownNow(); try { pool.awaitTermination(THREAD_TERMINATION_WAIT_TIME, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new WorkersTerminationFailedException("Workers-pool termination failed", e); } } } 

听起来这是一个JVM早于6u21的错误。 某些(可能是所有)操作系统的编译本机代码存在问题。

从链接:

该错误是由于各种Parker :: park()路径中丢失的内存障碍导致的,这些障碍可导致唤醒和挂起丢失。 (请注意,内置同步使用的PlatformEvent :: park不容易受到此问题的影响)。 -XX:+ UseMembar构成一种解决方法,因为状态转换逻辑中的membar屏障隐藏了Parker ::中的问题。 (也就是说,使用-UseMembar机制没有任何问题,但+ UseMembar隐藏了Parker::)的错误。 这是在JDK 5.0中添加java.util.concurrent引入的第一天错误。 我开发了一种简单的C模式的故障,它似乎更有可能在现代AMD和Nehalem平台上体现,可能是因为更长的存储缓冲区耗时更长。 我为Doug Lea提供了Parker :: park的初步修复,似乎可以消除这个bug。 我将把这个修复程序提供给运行时。 (我还将通过额外的测试用例和更长的解释来增加CR)。 这可能是后端口的良好候选者。

链接: JVM Bug

可以使用变通方法,但最好只获取最新的Java副本。

我没有看到ThreadPoolExecutorexecute(Runnable)代码中有任何锁定。 唯一的变量是workQueue 。 您为ThreadPoolExecutor提供了哪种BlockingQueue

关于死锁的话题:

您可以通过检查Windows上的或UNIX系统上的kill -QUIT提供的完整线程转储来确认这是一个死锁。

获得该数据后,您可以检查线程。 以下是Sun关于检查线程转储的文章(建议阅读)的相关摘录:

对于挂起,死锁或冻结程序:如果您认为程序挂起,则生成堆栈跟踪并检查状态MW或CW中的线程。 如果程序死锁,那么一些系统线程可能会显示为当前线程,因为JVM没有其他任何东西可以执行。

更轻松的说明:如果您在IDE中运行,可以确保在这些方法中没有启用断点。

这种死锁可能是因为您从执行程序本身运行任务。 例如,您提交了一个任务,而这个任务会激活另外4个任务。 如果你的池大小等于4,那么你只是完全溢出它,最后一个任务将等到任务返回值。 但是第一个任务等待所有分叉任务完成。

正如有人提到过的,这听起来像是正常的行为,ThreadPoolExecutor只是在等待做一些工作。 如果你想停止它,你需要打电话:

executor.shutdown()

让它终止,通常后面是executor.awaitTermination

库代码源如下(实际上是来自http://spymemcached.googlecode.com/files/memcached-2.4.2-sources.zip的类),
– 有点复杂 – 如果我没有弄错的话,可以防止重复调用FutureTask – 但似乎不容易出现死锁 – 非常简单的ThreadPool用法:

 package net.spy.memcached.transcoders; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import net.spy.memcached.CachedData; import net.spy.memcached.compat.SpyObject; /** * Asynchronous transcoder. */ public class TranscodeService extends SpyObject { private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(100), new ThreadPoolExecutor.DiscardPolicy()); /** * Perform a decode. */ public  Future decode(final Transcoder tc, final CachedData cachedData) { assert !pool.isShutdown() : "Pool has already shut down."; TranscodeService.Task task = new TranscodeService.Task( new Callable() { public T call() { return tc.decode(cachedData); } }); if (tc.asyncDecode(cachedData)) { this.pool.execute(task); } return task; } /** * Shut down the pool. */ public void shutdown() { pool.shutdown(); } /** * Ask whether this service has been shut down. */ public boolean isShutdown() { return pool.isShutdown(); } private static class Task extends FutureTask { private final AtomicBoolean isRunning = new AtomicBoolean(false); public Task(Callable callable) { super(callable); } @Override public T get() throws InterruptedException, ExecutionException { this.run(); return super.get(); } @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { this.run(); return super.get(timeout, unit); } @Override public void run() { if (this.isRunning.compareAndSet(false, true)) { super.run(); } } } } 

绝对奇怪。

但在编写自己的TPE之前,请尝试:

  • 另一个BlockingQueue impl。,例如LinkedBlockingQueue

  • 在ArrayBlockingQueue中指定fairness = true,即使用new ArrayBlockingQueue(n, true)

从这两个选项我会选择第二个’因为offer()被阻止是非常奇怪的; 我想到的一个原因 – Linux上的线程调度策略。 就像一个假设。