Java读写锁定要求,具有来自不同线程的锁定和释放

我试图找到一个不那么笨重的Java并发问题的解决方案。

问题的关键在于,当仍有工作线程处于活动状态时,我需要对块进行关闭调用,但关键的方面是每个工作任务都是异步生成和完成的,因此保持和释放必须由不同的线程完成。 一旦他们的工作完成,我需要他们以某种方式向关闭线程发送信号。 只是为了让事情更有趣,工作线程不能互相阻塞,所以我不确定信号量在这个特定实例中的应用。

我有一个解决方案,我认为安全地完成了这项工作,但是我对Java并发工具的不熟悉使我认为可能有一个更容易或更优雅的模式。 在这方面的任何帮助将不胜感激。

这是我到目前为止所做的,除了评论之外相当稀疏:

final private ReentrantReadWriteLock shutdownLock = new ReentrantReadWriteLock(); volatile private int activeWorkerThreads; private boolean isShutdown; private void workerTask() { try { // Point A: Worker tasks mustn't block each other. shutdownLock.readLock().lock(); // Point B: I only want worker tasks to continue if the shutdown signal // hasn't already been received. if (isShutdown) return; activeWorkerThreads ++; // Point C: This async method call returns immediately, soon after which // we release our lock. The shutdown thread may then acquire the write lock // but we want it to continue blocking until all of the asynchronous tasks // have completed. executeAsynchronously(new Runnable() { @Override final public void run() { try { // Do stuff. } finally { // Point D: Release of shutdown thread loop, if there are no other // active worker tasks. activeWorkerThreads --; } } }); } finally { shutdownLock.readLock().unlock(); } } final public void shutdown() { try { // Point E: Shutdown thread must block while any worker threads // have breached Point A. shutdownLock.writeLock().lock(); isShutdown = true; // Point F: Is there a better way to wait for this signal? while (activeWorkerThreads > 0) ; // Do shutdown operation. } finally { shutdownLock.writeLock().unlock(); } } 

在此先感谢您的帮助!

拉斯

您可以在此方案中使用信号量,而不需要忙等待shutdown()调用。 想到这一点的方法是将一套门票分发给工人,以表明他们在飞行中。 如果shutdown()方法可以获取所有票证,那么它知道它已经耗尽了所有工作人员并且没有活动。 因为#acquire()是一个阻塞调用,所以shutdown()不会旋转。 我已经将这种方法用于分布式主工作库,并且可以轻松扩展它以处理超时和重试。

 Executor executor = // ... final int permits = // ... final Semaphore semaphore = new Semaphore(permits); void schedule(final Runnable task) { semaphore.acquire(); try { executor.execute(new Runnable() { @Override public run() { try { task.run(); } finally { semaphore.release(); } } }); } catch (RejectedExecutionException e) { semaphore.release(); throw e; } } void shutDown() { semaphore.acquireUninterruptibly(permits); // do stuff } 

将activeWorkerThreads声明为volatile不允许您执行activeWorkerThreads ++,因为++只是简写,

 activeWorkerThreads = activeWorkerThreads + 1; 

这不是primefaces的。 请改用AtomicInteger。

executeAsynchronously()是否将作业发送到ExecutorService? 如果是这样,你可以使用awaitTermination方法,所以你的关闭钩子将是,

 executor.shutdown(); executor.awaitTermination(1, TimeUnit.Minutes); 

ExecutorService应该是sbridges提到的首选解决方案。

作为替代方案,如果工作线程的数量是固定的,那么您可以使用CountDownLatch :

 final CountDownLatch latch = new CountDownLatch(numberOfWorkers); 

latch传递给每个工作线程,并在任务完成时调用latch.countDown()

从主线程调用latch.await()以等待所有任务完成。

哇哇 永远不要这样做:

  // Point F: Is there a better way to wait for this signal? while (activeWorkerThreads > 0) ; 

你在旋转和消耗CPU。 使用适当的通知:

首先: synchronize对象, 然后检查activeWorkerThreads,并在对象上wait()如果它仍然> 0):

 synchronized (mutexObject) { while (activeWorkerThreads > 0) { mutexObject.wait(); } } 

第二步:让工作人员在减少activeWorkerThreads计数后通知()该对象。 在调用notify之前,必须在对象上进行同步。

 synchronized (mutexObject) { activeWorkerThreads--; mutexObject.notify(); } 

第三:每当你触摸activeWorkerThreads时,看到你(实现1和2之后)同步对象,使用它作为保护; 变量不需要是volatile。

然后:用作控制对activeWorkerThreads访问的互斥锁的同一对象也可用于控制对isShutdown的访问。 例:

 synchronized (mutexObject) { if (isShutdown) { return; } } 

除了不可思议的少量时间之外,这不会导致工作人员相互阻挡(无论如何,您可能无法避免使用读写锁定)。

这更像是对sbridges答案的评论,但作为评论提交有点太长了。

无论如何,只有1条评论。

当您关闭执行程序时,如果您使用默认实现(如Executors.newSingleThreadExecutor() ),则向执行程序提交新任务将导致未选中RejectedExecutionException 。 因此,在您的情况下,您可能希望使用以下代码。

码:

 new ThreadPoolExecutor(1, 1, 1, TimeUnit.HOURS, new LinkedBlockingQueue(), new ThreadPoolExecutor.DiscardPolicy()); 

这样,在调用shutdown()之后提交给执行程序的任务就会被忽略。 上面的参数(1,1 …等)应该生成一个执行器,它基本上是一个单线程执行器,但不会抛出运行时exception。