可以递增的锁存

有没有人知道是否有任何闩锁实现执行以下操作:

  • 有一种减小锁存值的方法,或者如果该值为零则等待
  • 有一种等待锁存值为零的方法
  • 有一种方法可以为锁存器的值添加一个数字

您可以使用如下的简单实现,而不是从AQS开始。 它有些天真(它与AQS无锁算法同步)但除非你期望在一个满足的场景中使用它,否则它就足够了。

public class CountUpAndDownLatch { private CountDownLatch latch; private final Object lock = new Object(); public CountUpAndDownLatch(int count) { this.latch = new CountDownLatch(count); } public void countDownOrWaitIfZero() throws InterruptedException { synchronized(lock) { while(latch.getCount() == 0) { lock.wait(); } latch.countDown(); lock.notifyAll(); } } public void waitUntilZero() throws InterruptedException { synchronized(lock) { while(latch.getCount() != 0) { lock.wait(); } } } public void countUp() { //should probably check for Integer.MAX_VALUE synchronized(lock) { latch = new CountDownLatch((int) latch.getCount() + 1); lock.notifyAll(); } } public int getCount() { synchronized(lock) { return (int) latch.getCount(); } } } 

注意:我没有深入测试它,但它看起来像预期的那样:

 public static void main(String[] args) throws InterruptedException { final CountUpAndDownLatch latch = new CountUpAndDownLatch(1); Runnable up = new Runnable() { @Override public void run() { try { System.out.println("IN UP " + latch.getCount()); latch.countUp(); System.out.println("UP " + latch.getCount()); } catch (InterruptedException ex) { } } }; Runnable downOrWait = new Runnable() { @Override public void run() { try { System.out.println("IN DOWN " + latch.getCount()); latch.countDownOrWaitIfZero(); System.out.println("DOWN " + latch.getCount()); } catch (InterruptedException ex) { } } }; Runnable waitFor0 = new Runnable() { @Override public void run() { try { System.out.println("WAIT FOR ZERO " + latch.getCount()); latch.waitUntilZero(); System.out.println("ZERO " + latch.getCount()); } catch (InterruptedException ex) { } } }; new Thread(waitFor0).start(); up.run(); downOrWait.run(); Thread.sleep(100); downOrWait.run(); new Thread(up).start(); downOrWait.run(); } 

输出:

 IN UP 1 UP 2 WAIT FOR ZERO 1 IN DOWN 2 DOWN 1 IN DOWN 1 ZERO 0 DOWN 0 IN DOWN 0 IN UP 0 DOWN 0 UP 0 

你也可以使用Phaser(java.util.concurrent.Phaser)

 final Phaser phaser = new Phaser(1); // register self while (/* some condition */) { phaser.register(); // Equivalent to countUp // do some work asynchronously, invoking // phaser.arriveAndDeregister() (equiv to countDown) in a finally block } phaser.arriveAndAwaitAdvance(); // await any async tasks to complete 

我希望这有帮助。

java.util.concurrent.Semaphore似乎符合要求。

  • 获得()或获得(n)
  • 还获得()(不知道我明白这里的区别是什么) (*)
  • release()或release(n)

(*)好的,没有内置方法等待信号量变得不可用 。 我想你自己编写了自己的包装器,它首先执行tryAcquire ,如果失败则触发你的“忙碌事件”(并继续使用正常的acquire )。 每个人都需要打电话给你的包装。 也许是Semaphore的子类?

对于那些需要基于AQS的解决方案的人来说,这是一个适合我的方案:

 public class CountLatch { private class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1L; public Sync() { } @Override protected int tryAcquireShared(int arg) { return count.get() == releaseValue ? 1 : -1; } @Override protected boolean tryReleaseShared(int arg) { return true; } } private final Sync sync; private final AtomicLong count; private volatile long releaseValue; public CountLatch(final long initial, final long releaseValue) { this.releaseValue = releaseValue; this.count = new AtomicLong(initial); this.sync = new Sync(); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public long countUp() { final long current = count.incrementAndGet(); if (current == releaseValue) { sync.releaseShared(0); } return current; } public long countDown() { final long current = count.decrementAndGet(); if (current == releaseValue) { sync.releaseShared(0); } return current; } public long getCount() { return count.get(); } } 

使用初始值和目标值初始化同步器。 一旦达到目标值(通过向上和/或向下计数),将释放等待的线程。

我需要一个并使用与CountDownLatch相同的策略构建它,它使用AQS(非阻塞),这个类也非常类似(如果不准确)到为Apache Camel创建的一个,我认为它也比JDK Phaser更轻,这个就像JDK的CountDownLact一样,它不会让你倒数到零以下,并且会让你倒计时:

import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer;

 public class CountingLatch { /** * Synchronization control for CountingLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private Sync() { } private Sync(final int initialState) { setState(initialState); } int getCount() { return getState(); } protected int tryAcquireShared(final int acquires) { return getState()==0 ? 1 : -1; } protected boolean tryReleaseShared(final int delta) { // Decrement count; signal when transition to zero for(; ; ){ final int c=getState(); final int nextc=c+delta; if(nextc<0){ return false; } if(compareAndSetState(c,nextc)){ return nextc==0; } } } } private final Sync sync; public CountingLatch() { sync=new Sync(); } public CountingLatch(final int initialCount) { sync=new Sync(initialCount); } public void increment() { sync.releaseShared(1); } public int getCount() { return sync.getCount(); } public void decrement() { sync.releaseShared(-1); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(final long timeout) throws InterruptedException { return sync.tryAcquireSharedNanos(1,TimeUnit.MILLISECONDS.toNanos(timeout)); } } 

这是CounterLatch的变体,可从Apache站点获得。

由于最熟悉的原因,它们的版本会阻塞调用程序线程, 变量( AtomicInteger )处于给定值。

但是调整这段代码的容易程度很高,这样你就可以选择Apache版本的function,或者……说“等到柜台达到一定值”。 可以说后者将具有更多的适用性。 在我的特殊情况下,我对此提出了异议,因为我想检查所有“块”是否已在SwingWorker.process() …但我已经找到了它的其他用途。

这里是用Jython编写的,正式是世界上最好的语言(TM)。 我将在适当的时候沙沙作响。

 class CounterLatch(): def __init__( self, initial = 0, wait_value = 0, lift_on_reached = True ): self.count = java.util.concurrent.atomic.AtomicLong( initial ) self.signal = java.util.concurrent.atomic.AtomicLong( wait_value ) class Sync( java.util.concurrent.locks.AbstractQueuedSynchronizer ): def tryAcquireShared( sync_self, arg ): if lift_on_reached: return -1 if (( not self.released.get() ) and self.count.get() != self.signal.get() ) else 1 else: return -1 if (( not self.released.get() ) and self.count.get() == self.signal.get() ) else 1 def tryReleaseShared( self, args ): return True self.sync = Sync() self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False def await( self, *args ): if args: assert len( args ) == 2 assert type( args[ 0 ] ) is int timeout = args[ 0 ] assert type( args[ 1 ] ) is java.util.concurrent.TimeUnit unit = args[ 1 ] return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)) else: self.sync.acquireSharedInterruptibly( 1 ) def count_relative( self, n ): previous = self.count.addAndGet( n ) if previous == self.signal.get(): self.sync.releaseShared( 0 ) return previous 

注意,Apache版本使用关键字volatile作为signalreleased 。 在Jython中我不认为这样存在,但是使用AtomicIntegerAtomicBoolean应该确保任何线程中没有值“过时”。

用法示例:

在SwingWorker构造函数中:

 self.publication_counter_latch = CounterLatch() 

在SW.publish中:

 # increase counter value BEFORE publishing chunks self.publication_counter_latch.count_relative( len( chunks ) ) self.super__publish( chunks ) 

在SW.process中:

 # ... do sthg [HERE] with the chunks! # AFTER having done what you want to do with your chunks: self.publication_counter_latch.count_relative( - len( chunks ) ) 

在等待块处理停止的线程中:

 worker.publication_counter_latch.await() 

似乎CountDownLatch将按您的意愿执行:

使用给定计数初始化CountDownLatch。 由于countDown()方法的调用,await方法阻塞直到当前计数达到零,之后释放所有等待的线程,并且任何后续的await调用立即返回。 这是一次性现象 – 计数无法重置。 如果您需要重置计数的版本,请考虑使用CyclicBarrier。

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html