可重置的倒计时补丁

我需要的东西直接等同于CountDownLatch ,但是可以重置(保持线程安全!)。 我不能使用经典的同步结构,因为它们在这种情况下根本不起作用(复杂的锁定问题)。 目前,我正在创建许多CountDownLatch对象,每个对象都替换前一个。 我相信这是在GC中的年轻一代(由于物体数量庞大)。 您可以看到使用下面的锁存器的代码(它是用于ns-3网络模拟器接口的java.net模拟的一部分)。

一些想法可能是尝试CyclicBarrier (JDK5 +)或Phaser (JDK7)

我可以测试代码并回到找到解决此问题的任何人,因为我是唯一可以将其插入正在运行的系统中以查看发生了什么的人:)

 /** * */ package kokunet; import java.io.IOException; import java.nio.channels.ClosedSelectorException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import kokuks.IConnectionSocket; import kokuks.KKSAddress; import kokuks.KKSSocket; import kokuks.KKSSocketListener; /** * KSelector * @version 1.0 * @author Chris Dennett */ public class KSelector extends SelectorImpl { // True if this Selector has been closed private volatile boolean closed = false; // Lock for close and cleanup final class CloseLock {} private final Object closeLock = new CloseLock(); private volatile boolean selecting = false; private volatile boolean wakeup = false; class SocketListener implements KKSSocketListener { protected volatile CountDownLatch latch = null; /** * */ public SocketListener() { newLatch(); } protected synchronized CountDownLatch newLatch() { return this.latch = new CountDownLatch(1); } protected synchronized void refreshReady(KKSSocket socket) { if (!selecting) return; synchronized (socketToChannel) { SelChImpl ch = socketToChannel.get(socket); if (ch == null) { System.out.println("ks sendCB: channel not found for socket: " + socket); return; } synchronized (channelToKey) { SelectionKeyImpl sk = channelToKey.get(ch); if (sk != null) { if (handleSelect(sk)) { latch.countDown(); } } } } } @Override public void connectionSucceeded(KKSSocket socket) { refreshReady(socket); } @Override public void connectionFailed(KKSSocket socket) { refreshReady(socket); } @Override public void dataSent(KKSSocket socket, long bytesSent) { refreshReady(socket); } @Override public void sendCB(KKSSocket socket, long bytesAvailable) { refreshReady(socket); } @Override public void onRecv(KKSSocket socket) { refreshReady(socket); } @Override public void newConnectionCreated(KKSSocket socket, KKSSocket newSocket, KKSAddress remoteaddress) { refreshReady(socket); } @Override public void normalClose(KKSSocket socket) { wakeup(); } @Override public void errorClose(KKSSocket socket) { wakeup(); } } protected final Map socketToChannel = new HashMap(); protected final Map channelToKey = new HashMap(); protected final SocketListener currListener = new SocketListener(); protected Thread selectingThread = null; SelChImpl getChannelForSocket(KKSSocket s) { synchronized (socketToChannel) { return socketToChannel.get(s); } } SelectionKeyImpl getSelKeyForChannel(KKSSocket s) { synchronized (channelToKey) { return channelToKey.get(s); } } protected boolean markRead(SelectionKeyImpl impl) { synchronized (impl) { if (!impl.isValid()) return false; impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_READ); return selectedKeys.add(impl); } } protected boolean markWrite(SelectionKeyImpl impl) { synchronized (impl) { if (!impl.isValid()) return false; impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_WRITE); return selectedKeys.add(impl); } } protected boolean markAccept(SelectionKeyImpl impl) { synchronized (impl) { if (!impl.isValid()) return false; impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_ACCEPT); return selectedKeys.add(impl); } } protected boolean markConnect(SelectionKeyImpl impl) { synchronized (impl) { if (!impl.isValid()) return false; impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_CONNECT); return selectedKeys.add(impl); } } /** * @param provider */ protected KSelector(SelectorProvider provider) { super(provider); } /* (non-Javadoc) * @see kokunet.SelectorImpl#implClose() */ @Override protected void implClose() throws IOException { provider().getApp().printMessage("implClose: closed: " + closed); synchronized (closeLock) { if (closed) return; closed = true; for (SelectionKey sk : keys) { provider().getApp().printMessage("dereg1"); deregister((AbstractSelectionKey)sk); provider().getApp().printMessage("dereg2"); SelectableChannel selch = sk.channel(); if (!selch.isOpen() && !selch.isRegistered()) ((SelChImpl)selch).kill(); } implCloseInterrupt(); } } protected void implCloseInterrupt() { wakeup(); } private boolean handleSelect(SelectionKey k) { synchronized (k) { boolean notify = false; if (!k.isValid()) { k.cancel(); ((SelectionKeyImpl)k).channel.socket().removeListener(currListener); return false; } SelectionKeyImpl ski = (SelectionKeyImpl)k; if ((ski.interestOps() & SelectionKeyImpl.OP_READ) != 0) { if (ski.channel.socket().getRxAvailable() > 0) { notify |= markRead(ski); } } if ((ski.interestOps() & SelectionKeyImpl.OP_WRITE) != 0) { if (ski.channel.socket().getTxAvailable() > 0) { notify |= markWrite(ski); } } if ((ski.interestOps() & SelectionKeyImpl.OP_CONNECT) != 0) { if (!ski.channel.socket().isConnectionless()) { IConnectionSocket cs = (IConnectionSocket)ski.channel.socket(); if (!ski.channel.socket().isAccepting() && !cs.isConnecting() && !cs.isConnected()) { notify |= markConnect(ski); } } } if ((ski.interestOps() & SelectionKeyImpl.OP_ACCEPT) != 0) { //provider().getApp().printMessage("accept check: ski: " + ski + ", connectionless: " + ski.channel.socket().isConnectionless() + ", listening: " + ski.channel.socket().isListening() + ", hasPendingConn: " + (ski.channel.socket().isConnectionless() ? "nope!" : ((IConnectionSocket)ski.channel.socket()).hasPendingConnections())); if (!ski.channel.socket().isConnectionless() && ski.channel.socket().isListening()) { IConnectionSocket cs = (IConnectionSocket)ski.channel.socket(); if (cs.hasPendingConnections()) { notify |= markAccept(ski); } } } return notify; } } private boolean handleSelect() { boolean notify = false; // get initial status for (SelectionKey k : keys) { notify |= handleSelect(k); } return notify; } /* (non-Javadoc) * @see kokunet.SelectorImpl#doSelect(long) */ @Override protected int doSelect(long timeout) throws IOException { processDeregisterQueue(); long timestartedms = System.currentTimeMillis(); synchronized (selectedKeys) { synchronized (currListener) { wakeup = false; selectingThread = Thread.currentThread(); selecting = true; } try { handleSelect(); if (!selectedKeys.isEmpty() || timeout == 0) { return selectedKeys.size(); } //TODO: useless op if we have keys available for (SelectionKey key : keys) { ((SelectionKeyImpl)key).channel.socket().addListener(currListener); } try { while (!wakeup && isOpen() && selectedKeys.isEmpty()) { CountDownLatch latch = null; synchronized (currListener) { if (wakeup || !isOpen() || !selectedKeys.isEmpty()) { break; } latch = currListener.newLatch(); } try { if (timeout > 0) { long currtimems = System.currentTimeMillis(); long remainingMS = (timestartedms + timeout) - currtimems; if (remainingMS > 0) { latch.await(remainingMS, TimeUnit.MILLISECONDS); } else { break; } } else { latch.await(); } } catch (InterruptedException e) { } } return selectedKeys.size(); } finally { for (SelectionKey key : keys) { ((SelectionKeyImpl)key).channel.socket().removeListener(currListener); } } } finally { synchronized (currListener) { selecting = false; selectingThread = null; wakeup = false; } } } } /* (non-Javadoc) * @see kokunet.SelectorImpl#implRegister(kokunet.SelectionKeyImpl) */ @Override protected void implRegister(SelectionKeyImpl ski) { synchronized (closeLock) { if (closed) throw new ClosedSelectorException(); synchronized (channelToKey) { synchronized (socketToChannel) { keys.add(ski); socketToChannel.put(ski.channel.socket(), ski.channel); channelToKey.put(ski.channel, ski); } } } } /* (non-Javadoc) * @see kokunet.SelectorImpl#implDereg(kokunet.SelectionKeyImpl) */ @Override protected void implDereg(SelectionKeyImpl ski) throws IOException { synchronized (channelToKey) { synchronized (socketToChannel) { keys.remove(ski); socketToChannel.remove(ski.channel.socket()); channelToKey.remove(ski.channel); SelectableChannel selch = ski.channel(); if (!selch.isOpen() && !selch.isRegistered()) ((SelChImpl)selch).kill(); } } } /* (non-Javadoc) * @see kokunet.SelectorImpl#wakeup() */ @Override public Selector wakeup() { synchronized (currListener) { if (selecting) { wakeup = true; selecting = false; selectingThread.interrupt(); selectingThread = null; } } return this; } } 

干杯,
克里斯

我复制了CountDownLatch并实现了一个reset()方法,它将内部Sync类重置为其初始状态(起始计数):)似乎工作正常。 没有更多不必要的对象创建\ o /由于sync是私有的,因此无法进行子类化。 嘘。

 import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; /** * A synchronization aid that allows one or more threads to wait until * a set of operations being performed in other threads completes. * * 

A {@code CountDownLatch} is initialized with a given count. * The {@link #await await} methods block until the current count reaches * zero due to invocations of the {@link #countDown} method, after which * all waiting threads are released and any subsequent invocations of * {@link #await await} return immediately. This is a one-shot phenomenon * -- the count cannot be reset. If you need a version that resets the * count, consider using a {@link CyclicBarrier}. * *

A {@code CountDownLatch} is a versatile synchronization tool * and can be used for a number of purposes. A * {@code CountDownLatch} initialized with a count of one serves as a * simple on/off latch, or gate: all threads invoking {@link #await await} * wait at the gate until it is opened by a thread invoking {@link * #countDown}. A {@code CountDownLatch} initialized to N * can be used to make one thread wait until N threads have * completed some action, or some action has been completed N times. * *

A useful property of a {@code CountDownLatch} is that it * doesn't require that threads calling {@code countDown} wait for * the count to reach zero before proceeding, it simply prevents any * thread from proceeding past an {@link #await await} until all * threads could pass. * *

Sample usage: Here is a pair of classes in which a group * of worker threads use two countdown latches: *

    *
  • The first is a start signal that prevents any worker from proceeding * until the driver is ready for them to proceed; *
  • The second is a completion signal that allows the driver to wait * until all workers have completed. *
* *
 * class Driver { // ... * void main() throws InterruptedException { * CountDownLatch startSignal = new CountDownLatch(1); * CountDownLatch doneSignal = new CountDownLatch(N); * * for (int i = 0; i < N; ++i) // create and start threads * new Thread(new Worker(startSignal, doneSignal)).start(); * * doSomethingElse(); // don't let run yet * startSignal.countDown(); // let all threads proceed * doSomethingElse(); * doneSignal.await(); // wait for all to finish * } * } * * class Worker implements Runnable { * private final CountDownLatch startSignal; * private final CountDownLatch doneSignal; * Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { * this.startSignal = startSignal; * this.doneSignal = doneSignal; * } * public void run() { * try { * startSignal.await(); * doWork(); * doneSignal.countDown(); * } catch (InterruptedException ex) {} // return; * } * * void doWork() { ... } * } * * 

* *

Another typical usage would be to divide a problem into N parts, * describe each part with a Runnable that executes that portion and * counts down on the latch, and queue all the Runnables to an * Executor. When all sub-parts are complete, the coordinating thread * will be able to pass through await. (When threads must repeatedly * count down in this way, instead use a {@link CyclicBarrier}.) * *

 * class Driver2 { // ... * void main() throws InterruptedException { * CountDownLatch doneSignal = new CountDownLatch(N); * Executor e = ... * * for (int i = 0; i < N; ++i) // create and start threads * e.execute(new WorkerRunnable(doneSignal, i)); * * doneSignal.await(); // wait for all to finish * } * } * * class WorkerRunnable implements Runnable { * private final CountDownLatch doneSignal; * private final int i; * WorkerRunnable(CountDownLatch doneSignal, int i) { * this.doneSignal = doneSignal; * this.i = i; * } * public void run() { * try { * doWork(i); * doneSignal.countDown(); * } catch (InterruptedException ex) {} // return; * } * * void doWork() { ... } * } * * 

* *

Memory consistency effects: Actions in a thread prior to calling * {@code countDown()} * happen-before * actions following a successful return from a corresponding * {@code await()} in another thread. * * @since 1.5 * @author Doug Lea */ public class ResettableCountDownLatch { /** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; public final int startCount; Sync(int count) { this.startCount = count; setState(startCount); } int getCount() { return getState(); } public int tryAcquireShared(int acquires) { return getState() == 0? 1 : -1; } public boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } public void reset() { setState(startCount); } } private final Sync sync; /** * Constructs a {@code CountDownLatch} initialized with the given count. * * @param count the number of times {@link #countDown} must be invoked * before threads can pass through {@link #await} * @throws IllegalArgumentException if {@code count} is negative */ public ResettableCountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } /** * Causes the current thread to wait until the latch has counted down to * zero, unless the thread is {@linkplain Thread#interrupt interrupted}. * *

If the current count is zero then this method returns immediately. * *

If the current count is greater than zero then the current * thread becomes disabled for thread scheduling purposes and lies * dormant until one of two things happen: *

    *

  • The count reaches zero due to invocations of the * {@link #countDown} method; or *
  • Some other thread {@linkplain Thread#interrupt interrupts} * the current thread. *

* *

If the current thread: *

    *

  • has its interrupted status set on entry to this method; or *
  • is {@linkplain Thread#interrupt interrupted} while waiting, *

* then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * @throws InterruptedException if the current thread is interrupted * while waiting */ public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public void reset() { sync.reset(); } /** * Causes the current thread to wait until the latch has counted down to * zero, unless the thread is {@linkplain Thread#interrupt interrupted}, * or the specified waiting time elapses. * *

If the current count is zero then this method returns immediately * with the value {@code true}. * *

If the current count is greater than zero then the current * thread becomes disabled for thread scheduling purposes and lies * dormant until one of three things happen: *

    *

  • The count reaches zero due to invocations of the * {@link #countDown} method; or *
  • Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or *
  • The specified waiting time elapses. *

* *

If the count reaches zero then the method returns with the * value {@code true}. * *

If the current thread: *

    *

  • has its interrupted status set on entry to this method; or *
  • is {@linkplain Thread#interrupt interrupted} while waiting, *

* then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * *

If the specified waiting time elapses then the value {@code false} * is returned. If the time is less than or equal to zero, the method * will not wait at all. * * @param timeout the maximum time to wait * @param unit the time unit of the {@code timeout} argument * @return {@code true} if the count reached zero and {@code false} * if the waiting time elapsed before the count reached zero * @throws InterruptedException if the current thread is interrupted * while waiting */ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /** * Decrements the count of the latch, releasing all waiting threads if * the count reaches zero. * *

If the current count is greater than zero then it is decremented. * If the new count is zero then all waiting threads are re-enabled for * thread scheduling purposes. * *

If the current count equals zero then nothing happens. */ public void countDown() { sync.releaseShared(1); } /** * Returns the current count. * *

This method is typically used for debugging and testing purposes. * * @return the current count */ public long getCount() { return sync.getCount(); } /** * Returns a string identifying this latch, as well as its state. * The state, in brackets, includes the String {@code "Count ="} * followed by the current count. * * @return a string identifying this latch, as well as its state */ public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }

根据@Fidel -s的回答,我为ResettableCountDownLatch做了直接替换。 我所做的改变

  • mLatchprivate volatile
  • mInitialCountprivate final
  • simple await()的返回类型已更改为void。

否则,原始代码也很酷。 所以,这是完整的增强代码:

 public class ResettableCountDownLatch { private final int initialCount; private volatile CountDownLatch latch; public ResettableCountDownLatch(int count) { initialCount = count; latch = new CountDownLatch(count); } public void reset() { latch = new CountDownLatch(initialCount); } public void countDown() { latch.countDown(); } public void await() throws InterruptedException { latch.await(); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return latch.await(timeout, unit); } } 

更新

基于@Systemplanet -s评论,这是一个更安全的reset()版本:

  // An atomic reference is required because reset() is not that atomic anymore, not even with `volatile`. private final AtomicReference latchHolder = new AtomicReference<>(); public void reset() { // obtaining a local reference for modifying the required latch final CountDownLatch oldLatch = latchHolder.getAndSet(null); if (oldLatch != null) { // checking the count each time to prevent unnecessary countdowns due to parallel countdowns while (0L < oldLatch.getCount()) { oldLatch.countDown(); } } } 

基本上,它是简单和安全之间的选择。 即如果您愿意将责任转移到代码的客户端,那么在reset()设置引用null就足够了。

另一方面,如果您想让这段代码的用户轻松一点,那么您需要使用更多技巧。

我不确定这是否存在致命缺陷,但我最近遇到了同样的问题并通过每次我想重置时只是实例化一个新的CountDownLatch对象来解决它。 像这样的东西:

服务员:

 bla(); latch.await(); //now the latch has counted down to 0 blabla(); 

CountDowner

 foo(); latch.countDown(); //now the latch has counted down to 0 latch = new CountDownLatch(1); Waiter.receiveReferenceToNewLatch(latch); bar(); 

显然这是一个沉重的抽象,但到目前为止它对我有用,并且不要求你修改任何类定义。

Phaser有更多选择,我们可以使用它实现resettable countdownLatch。

请阅读以下网站的以下基本概念

https://examples.javacodegeeks.com/core-java/util/concurrent/phaser/java-util-concurrent-phaser-example/

http://netjs.blogspot.in/2016/01/phaser-in-java-concurrency.html

 import java.util.concurrent.Phaser; /** * Resettable countdownLatch using phaser */ public class PhaserExample { public static void main(String[] args) throws InterruptedException { Phaser phaser = new Phaser(3); // you can use constructor hint or // register() or mixture of both // register self... so parties are incremented to 4 (3+1) now phaser.register(); //register is one time call for all the phases. //means no need to register for every phase int phasecount = phaser.getPhase(); System.out.println("Phasecount is " + phasecount); new PhaserExample().testPhaser(phaser, 2000); new PhaserExample().testPhaser(phaser, 4000); new PhaserExample().testPhaser(phaser, 6000); // similar to await() in countDownLatch/CyclicBarrier // parties are decremented to 3 (4+1) now phaser.arriveAndAwaitAdvance(); // once all the thread arrived at same level, barrier opens System.out.println("Barrier has broken."); phasecount = phaser.getPhase(); System.out.println("Phasecount is " + phasecount); //second phase new PhaserExample().testPhaser(phaser, 2000); new PhaserExample().testPhaser(phaser, 4000); new PhaserExample().testPhaser(phaser, 6000); phaser.arriveAndAwaitAdvance(); // once all the thread arrived at same level, barrier opens System.out.println("Barrier has broken."); phasecount = phaser.getPhase(); System.out.println("Phasecount is " + phasecount); } private void testPhaser(final Phaser phaser, final int sleepTime) { // phaser.register(); //Already constructor hint is given so not // required new Thread() { @Override public void run() { try { Thread.sleep(sleepTime); System.out.println(Thread.currentThread().getName() + " arrived"); // phaser.arrive(); //similar to CountDownLatch#countDown() phaser.arriveAndAwaitAdvance();// thread will wait till Barrier opens // arriveAndAwaitAdvance is similar to CyclicBarrier#await() } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " after passing barrier"); } }.start(); } } 

另一个替补

 import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class ResettableCountDownLatch { int mInitialCount; CountDownLatch mLatch; public ResettableCountDownLatch(int count) { mInitialCount = count; mLatch = new CountDownLatch(count); } public void reset() { mLatch = new CountDownLatch(mInitialCount); } public void countDown() { mLatch.countDown(); } public boolean await() throws InterruptedException { boolean result = mLatch.await(); return result; } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { boolean result = mLatch.await(timeout, unit); return result; } } 

看起来您想将异步I / O转为同步。 使用异步I / O的整个想法是避免线程,但CountDownLatch需要使用线程。 这在你的问题中是一个明显的矛盾。 所以你可以:

  • 继续使用线程并使用同步I / O而不是Selectors和suff。 这将更加简单和可靠
  • 继续使用异步I / 0并放弃CountDownLatch。 然后你需要一个异步库 – 看看RxJava,Akka或df4j。
  • 继续开发您的项目以获得乐趣。 然后,您可以尝试使用java.util.Semaphore而不是CountDownLatch,或使用synchronized / wait / notify编写您自己的同步类。