如何通过密钥获取锁定

在不锁定整个集合的情况下,防止在键值集中并发更新一个记录的最佳方法是什么? 从语义上讲,我正在寻找某种键的锁定(理想情况下,Java实现,但不一定):

interface LockByKey { void lock(String key); // acquire an exclusive lock for a key void unlock(String key); // release lock for a key } 

此锁用于同步对远程存储的访问,因此某些同步Java集合不是一个选项。

番石榴有这样的东西在13.0发布; 如果你愿意,你可以把它从HEAD中拿出来。

Striped或多或少地分配特定数量的锁,然后根据其哈希码将字符串分配给锁。 API看起来或多或少像

 Striped locks = Striped.lock(stripes); Lock l = locks.get(string); l.lock(); try { // do stuff } finally { l.unlock(); } 

或多或少,可控制的条带数量允许您根据内存使用情况进行并发交易,因为为每个字符串键分配完整锁定会变得昂贵; 实际上,当你得到哈希冲突时,你只会得到锁争用,这是(可预见的)罕见的。

(披露:我向番石榴捐款。)

这是怎么回事; 我做的。 是的,我同意如果两个不同的字符串共享相同的哈希码将最终获得相同的锁。

 class LockByKey { ObjectForString objHolder = new ObjectForString(100); public void lockThenWorkForKey (String key) { synchronized(objHolder.valueOf(key)){ //DoSomeWork } } } public final class ObjectForString { private final Object[] cache; private final int cacheSize; final int mask; public ObjectForString(int size) { // Find power-of-two sizes best matching arguments int ssize = 1; while (ssize < size) { ssize <<= 1; } mask = ssize - 1; cache = new Object[ssize]; cacheSize = ssize; //build the Cache for (int i = 0; i < cacheSize; i++) { this.cache[i] = new Object(); } } public Object valueOf(String key) { int index = key.hashCode(); return cache[index & mask]; } } 

每桶保留一个互斥锁/锁。 这将确保只有碰撞在该互斥锁上等待。

如果你提到的“记录”是一个可变对象而“更新”意味着修改了对象的内部状态而不会干扰容器的结构,那么你可以通过锁定记录对象来完成你想要的任务。

但是,如果“更新”意味着从容器中删除记录对象并替换它,则必须锁定整个容器以防止其他线程以不一致的状态看到它。

在任何一种情况下,您都应该查看java.util.concurrent包中的类。

我写了一个可以动态锁定任何键的类。 它使用静态CuncurrentHashMap 。 但是如果没有使用锁定,则地图为空。 语法可能会混淆为我们基于键创建的新对象。 如果不使用,它会在unlock清除unlock 。 可以保证任何两个基于两个相等/ hascode键创建的DynamicKeyLock,它们将被相互锁定。

请参阅Java 8,Java 6的实现和一个小测试。

Java 8:

 public class DynamicKeyLock implements Lock { private final static ConcurrentHashMap locksMap = new ConcurrentHashMap<>(); private final T key; public DynamicKeyLock(T lockKey) { this.key = lockKey; } private static class LockAndCounter { private final Lock lock = new ReentrantLock(); private final AtomicInteger counter = new AtomicInteger(0); } private LockAndCounter getLock() { return locksMap.compute(key, (key, lockAndCounterInner) -> { if (lockAndCounterInner == null) { lockAndCounterInner = new LockAndCounter(); } lockAndCounterInner.counter.incrementAndGet(); return lockAndCounterInner; }); } private void cleanupLock(LockAndCounter lockAndCounterOuter) { if (lockAndCounterOuter.counter.decrementAndGet() == 0) { locksMap.compute(key, (key, lockAndCounterInner) -> { if (lockAndCounterInner == null || lockAndCounterInner.counter.get() == 0) { return null; } return lockAndCounterInner; }); } } @Override public void lock() { LockAndCounter lockAndCounter = getLock(); lockAndCounter.lock.lock(); } @Override public void unlock() { LockAndCounter lockAndCounter = locksMap.get(key); lockAndCounter.lock.unlock(); cleanupLock(lockAndCounter); } @Override public void lockInterruptibly() throws InterruptedException { LockAndCounter lockAndCounter = getLock(); try { lockAndCounter.lock.lockInterruptibly(); } catch (InterruptedException e) { cleanupLock(lockAndCounter); throw e; } } @Override public boolean tryLock() { LockAndCounter lockAndCounter = getLock(); boolean acquired = lockAndCounter.lock.tryLock(); if (!acquired) { cleanupLock(lockAndCounter); } return acquired; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { LockAndCounter lockAndCounter = getLock(); boolean acquired; try { acquired = lockAndCounter.lock.tryLock(time, unit); } catch (InterruptedException e) { cleanupLock(lockAndCounter); throw e; } if (!acquired) { cleanupLock(lockAndCounter); } return acquired; } @Override public Condition newCondition() { LockAndCounter lockAndCounter = locksMap.get(key); return lockAndCounter.lock.newCondition(); } } 

Java 6:

 public class DynamicKeyLock implements Lock { private final static ConcurrentHashMap locksMap = new ConcurrentHashMap(); private final T key; public DynamicKeyLock(T lockKey) { this.key = lockKey; } private static class LockAndCounter { private final Lock lock = new ReentrantLock(); private final AtomicInteger counter = new AtomicInteger(0); } private LockAndCounter getLock() { while (true) // Try to init lock { LockAndCounter lockAndCounter = locksMap.get(key); if (lockAndCounter == null) { LockAndCounter newLock = new LockAndCounter(); lockAndCounter = locksMap.putIfAbsent(key, newLock); if (lockAndCounter == null) { lockAndCounter = newLock; } } lockAndCounter.counter.incrementAndGet(); synchronized (lockAndCounter) { LockAndCounter lastLockAndCounter = locksMap.get(key); if (lockAndCounter == lastLockAndCounter) { return lockAndCounter; } // else some other thread beat us to it, thus try again. } } } private void cleanupLock(LockAndCounter lockAndCounter) { if (lockAndCounter.counter.decrementAndGet() == 0) { synchronized (lockAndCounter) { if (lockAndCounter.counter.get() == 0) { locksMap.remove(key); } } } } @Override public void lock() { LockAndCounter lockAndCounter = getLock(); lockAndCounter.lock.lock(); } @Override public void unlock() { LockAndCounter lockAndCounter = locksMap.get(key); lockAndCounter.lock.unlock(); cleanupLock(lockAndCounter); } @Override public void lockInterruptibly() throws InterruptedException { LockAndCounter lockAndCounter = getLock(); try { lockAndCounter.lock.lockInterruptibly(); } catch (InterruptedException e) { cleanupLock(lockAndCounter); throw e; } } @Override public boolean tryLock() { LockAndCounter lockAndCounter = getLock(); boolean acquired = lockAndCounter.lock.tryLock(); if (!acquired) { cleanupLock(lockAndCounter); } return acquired; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { LockAndCounter lockAndCounter = getLock(); boolean acquired; try { acquired = lockAndCounter.lock.tryLock(time, unit); } catch (InterruptedException e) { cleanupLock(lockAndCounter); throw e; } if (!acquired) { cleanupLock(lockAndCounter); } return acquired; } @Override public Condition newCondition() { LockAndCounter lockAndCounter = locksMap.get(key); return lockAndCounter.lock.newCondition(); } } 

测试:

 public class DynamicKeyLockTest { @Test public void testDifferentKeysDontLock() throws InterruptedException { DynamicKeyLock lock = new DynamicKeyLock<>(new Object()); lock.lock(); AtomicBoolean anotherThreadWasExecuted = new AtomicBoolean(false); try { new Thread(() -> { DynamicKeyLock anotherLock = new DynamicKeyLock<>(new Object()); anotherLock.lock(); try { anotherThreadWasExecuted.set(true); } finally { anotherLock.unlock(); } }).start(); Thread.sleep(100); } finally { Assert.assertTrue(anotherThreadWasExecuted.get()); lock.unlock(); } } @Test public void testSameKeysLock() throws InterruptedException { Object key = new Object(); DynamicKeyLock lock = new DynamicKeyLock<>(key); lock.lock(); AtomicBoolean anotherThreadWasExecuted = new AtomicBoolean(false); try { new Thread(() -> { DynamicKeyLock anotherLock = new DynamicKeyLock<>(key); anotherLock.lock(); try { anotherThreadWasExecuted.set(true); } finally { anotherLock.unlock(); } }).start(); Thread.sleep(100); } finally { Assert.assertFalse(anotherThreadWasExecuted.get()); lock.unlock(); } } }