锁定任意键的处理程序

我有代码实现任意键的“锁定处理程序”。 给定一个key ,它确保一次只有一个线程可以process该(或等于)密钥(这意味着调用externalSystem.process(key)调用)。

到目前为止,我有这样的代码:

 public class MyHandler { private final SomeWorkExecutor someWorkExecutor; private final ConcurrentHashMap lockMap = new ConcurrentHashMap(); public void handle(Key key) { // This can lead to OOM as it creates locks without removing them Lock keyLock = lockMap.computeIfAbsent( key, (k) -> new ReentrantLock() ); keyLock.lock(); try { someWorkExecutor.process(key); } finally { keyLock.unlock(); } } } 

据我所知,这段代码可能会导致OutOfMemoryError因为没有一个清晰的地图。

我想如何制作将积累有限数量的元素的地图。 当超过限制时,我们应该用new替换最旧的访问元素(此代码应与作为监视器的最旧元素同步)。 但是我不知道怎么回调会说我超出限制。

请分享你的想法。

PS

我重读了任务,现在我发现我有限制, handle方法不能被调用超过8个线程。 我不知道它对我有什么帮助,但我刚提到它。

PS2

通过@Boris,Spider被认为是一个简单明了的解决方案:

 } finally { lockMap.remove(key); keyLock.unlock(); } 

但是在Boris注意到代码之后我们没有线程安全,因为它破坏了行为:
让研究用同样的关键调用3个线程:

  1. 线程#1获取锁定,现在在map.remove(key);之前map.remove(key);
  2. 线程#2使用等号键调用,因此它在线程#1释放锁定时等待。
  3. 然后线程#1执行map.remove(key); 。 在这个线程#3之后调用方法handle 。 它会检查映射中是否存在此键的锁定,因此它会创建新锁并获取它。
  4. 线程#1释放锁定,因此线程#2获取它。
    因此,对于等号键,可以并行调用线程#2和线程#3。 但不应该允许。

为了避免这种情况,在映射清除之前,我们应该阻止任何线程获取锁,而waitset的所有线程都没有获取并释放锁。 看起来需要足够复杂的同步,这将导致算法运算缓慢。 也许我们应该在地图大小超过某些有限值时不时清除地图。

我浪费了很多时间,但不幸的是我没有想法如何实现这一目标。

您不需要尝试将大小限制为某个任意值 – 事实certificate,您可以完成这种“锁定处理程序”习惯用法,同时仅准确存储当前锁定在地图中的键数。

我们的想法是使用一个简单的约定:成功地映射添加到映射计为“锁定”操作,并将其删除计为“解锁”操作。 这巧妙地避免了在某些线程仍然锁定并且其他竞争条件时删除映射的问题。

此时,映射中的value仅用于阻止使用相同密钥到达的其他线程,并且需要等到映射被删除。

这是一个使用CountDownLatch而不是Lock作为地图值的示例1

 public void handle(Key key) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); // try to acquire the lock by inserting our latch as a // mapping for key while(true) { CountDownLatch existing = lockMap.putIfAbsent(key, latch); if (existing != null) { // there is an existing key, wait on it existing.await(); } else { break; } } try { externalSystem.process(key); } finally { lockMap.remove(key); latch.countDown(); } } 

这里,映射的生命周期只有保持锁定。 与不同密钥的并发请求相比,映射的条目永远不会多。

与您的方法的不同之处在于映射不会“重复使用” – 每个handle调用都会创建一个新的锁存器和映射。 由于您已经在进行昂贵的primefaces操作,因此在实践中这可能不会太慢​​。 另一个缺点是,在许多等待线程中,当锁存器倒计时所有线程都会被唤醒,但只有一个会成功地将新映射放入并因此获取锁定 – 其余的则重新进入新锁定状态。

可以构建另一个版本,当线程出现并等待现有映射时重新使用映射。 基本上,解锁线程只是对其中一个等待线程进行“切换”。 只有一个映射将用于等待同一个键的整个线程集 – 它按顺序传递给每个线程。 大小仍然有限,因为没有更多的线程正在等待给定的映射,它仍然被删除。

要实现它,可以使用可以计算等待线程数的映射值替换CountDownLatch 。 当线程执行解锁时,它首先检查是否有任何线程在等待,如果是,则唤醒一个线程进行切换。 如果没有线程在等待,它会“销毁”该对象(即,设置该对象不再位于映射中的标志)并将其从地图中删除。

你需要在适当的锁定下进行上述操作,并且有一些棘手的细节。 在实践中,我发现上面简短而又甜蜜的例子很有效。


1动态编写,未编译且未经测试,但该想法有效。

您可以依赖方法compute(K key, BiFunction remappingFunction)来同步对给定键的方法process调用,您甚至不再需要使用Lock作为地图值的类型,因为你不再依赖它了。

我们的想法是依靠ConcurrentHashMap的内部锁定机制来执行您的方法,这将允许线程并行执行其相应散列不属于同一个bin的键的process方法。 这相当于基于条带锁的方法,除了您不需要额外的第三方库。

条带锁的方法很有意思,因为它在内存占用方面非常轻,因为你只需要有限量的锁就可以了,所以你的锁所需的内存占用是已知的并且永远不会改变,这不是对每个密钥使用一个锁的方法(如在您的问题中),这样通常更好/建议使用基于条带锁的方法来满足此类需求。

所以你的代码可能是这样的:

 // This will create a ConcurrentHashMap with an initial table size of 16 // bins by default, you may provide an initialCapacity and loadFactor // if too much or not enough to get the expected table size in order // increase or reduce the concurrency level of your map // NB: We don't care much of the type of the value so I arbitrarily // used Void but it could be any type like simply Object private final ConcurrentMap lockMap = new ConcurrentHashMap<>(); public void handle(Key lockKey) { // Execute the method process through the remapping Function lockMap.compute( lockKey, (key, value) -> { // Execute the process method under the protection of the // lock of the bin of hashes corresponding to the key someWorkExecutor.process(key); // Returns null to keep the Map empty return null; } ); } 

注意1:由于我们总是返回null因此地图将始终为空,这样您就不会因为此地图而耗尽内存。

注意2:由于我们从不影响给定键的值,请注意它也可以使用computeIfAbsent(K key, Function mappingFunction)方法computeIfAbsent(K key, Function mappingFunction)

 public void handle(Key lockKey) { // Execute the method process through the remapping Function lockMap.computeIfAbsent( lockKey, key -> { // Execute the process method under the protection of the // lock of the segment of hashes corresponding to the key someWorkExecutor.process(key); // Returns null to keep the Map empty return null; } ); } 

注意3:确保您的方法process永远不会调用任何键的方法handle ,因为您最终会遇到无限循环(相同的键)或死锁(其他非有序键,例如:如果一个线程调用handle(key1) ,然后process内部调用handle(key2) ,另一个线程调用并行handle(key2)然后process内部调用handle(key1) ,无论采用何种方法,都会遇到死锁。 此行为并非特定于此方法,它将以任何方法发生。

一种方法是完全省去并发哈希映射,只需使用带锁定的常规HashMap来执行映射所需的操作并以primefaces方式锁定状态。

乍一看,这似乎降低了系统的并发性,但是如果我们假设process(key)调用相对于非常快速的锁操作而言是冗长的,那么它运行良好,因为process()调用仍然并发运行。 在专属关键部分中只会进行少量且固定的工作。

这是一个草图:

 public class MyHandler { private static class LockHolder { ReentrantLock lock = new ReentrantLock(); int refcount = 0; void lock(){ lock.lock(); } } private final SomeWorkExecutor someWorkExecutor; private final Lock mapLock = new ReentrantLock(); private final HashMap lockMap = new HashMap<>(); public void handle(Key key) { // lock the map mapLock.lock(); LockHolder holder = lockMap.computeIfAbsent(key, k -> new LockHolder()); // the lock in holder is either unlocked (newly created by us), or an existing lock, let's increment refcount holder.refcount++; mapLock.unlock(); holder.lock(); try { someWorkExecutor.process(key); } finally { mapLock.lock() keyLock.unlock(); if (--holder.refcount == 0) { // no more users, remove lock holder map.remove(key); } mapLock.unlock(); } } } 

我们使用mapLock ,它只在共享的mapLock下操作,以跟踪锁的用户数。 每当refcount为零时,我们就可以在退出处理程序时删除该条目。 这种方法很好,因为它很容易推理,并且如果与锁定开销相比, process()调用相对昂贵,那么它将表现良好。 由于地图操作发生在共享锁下,因此添加其他逻辑也很简单,例如,在地图中保留一些Holder对象,跟踪统计信息等。

谢谢Ben Mane
我找到了这个变种。

 public class MyHandler { private final int THREAD_COUNT = 8; private final int K = 100; private final Striped striped = Striped.lazyWeakLock(THREAD_COUNT * K); private final SomeWorkExecutor someWorkExecutor = new SomeWorkExecutor(); public void handle(Key key) throws InterruptedException { Lock keyLock = striped.get(key); keyLock.lock(); try { someWorkExecutor.process(key); } finally { keyLock.unlock(); } } } 

这是一个简短而又甜蜜的版本,它利用了Guava的Interner类的弱版本,大量提升了每个键用作锁的“规范”对象,并实现了弱引用语义,以便清除未使用的条目。

 public class InternerHandler { private final Interner = Interners.newWeakInterner(); public void handle(Key key) throws InterruptedException { Key canonKey = Interner.intern(key); synchronized (canonKey) { someWorkExecutor.process(key); } } } 

基本上我们要求一个规范的 canonKey ,它equal() key equal() ,然后锁定这个canonKey 。 每个人都会同意规范密钥,因此所有传递相同密钥的呼叫者都会同意锁定的对象。

Interner的弱特性意味着只要没有使用规范密钥,就可以删除条目,这样就可以避免在interner中累积条目。 稍后,如果再次出现相等的密钥,则选择新的规范条目。

上面的简单代码依赖于内置监视器进行synchronize – 但如果这对您不起作用(例如,它已经用于其他目的),您可以在Key类中包含一个锁定对象或创建一个holder对象。

 class MyHandler { private final Map lockMap = Collections.synchronizedMap(new WeakHashMap<>()); private final SomeWorkExecutor someWorkExecutor = new SomeWorkExecutor(); public void handle(Key key) throws InterruptedException { Lock keyLock = lockMap.computeIfAbsent(key, (k) -> new ReentrantLock()); keyLock.lock(); try { someWorkExecutor.process(key); } finally { keyLock.unlock(); } } } 

每次为key创建和删除锁定对象在性能方面是一项代价高昂的操作。 当你从并发映射(比如缓存)添加/删除锁时,必须确保从缓存中放置/删除对象本身是线程安全的。 所以这似乎不是一个好主意,但可以通过ConcurrentHashMap实现

条带锁定方法(也由内部并发哈希映射使用)是更好的方法。 从Google Guava文档中可以解释为

如果要将锁与对象关联,则需要的关键保证是,如果key1.equals(key2),则与key1关联的锁与与key2关联的锁相同。

最简单的方法是将每个密钥与同一个锁相关联,这样可以实现最粗糙的同步。 另一方面,您可以将每个不同的密钥与不同的锁相关联,但这需要线性内存消耗和锁本身系统的并发管理,因为会发现新密钥。

Striped允许程序员选择多个锁,这些锁基于其哈希码在密钥之间分配。 这允许程序员动态选择并发和内存消耗之间的权衡,同时保留密钥不变量,如果key1.equals(key2),则stripe.get(key1)== striped.get(key2)

码:

 //declare globally; eg class field level Striped rwLockStripes = Striped.lock(16); Lock lock = rwLockStripes.get("key"); lock.lock(); try { // do you work here } finally { lock.unlock(); } 

剪切代码后可以帮助实现锁定的放置/删除。

 private ConcurrentHashMap caches = new ConcurrentHashMap<>(); public void processWithLock(String key) { ReentrantLock lock = findAndGetLock(key); lock.lock(); try { // do you work here } finally { unlockAndClear(key, lock); } } private void unlockAndClear(String key, ReentrantLock lock) { // *** Step 1: Release the lock. lock.unlock(); // *** Step 2: Attempt to remove the lock // This is done by calling compute method, if given lock is present in // cache. if current lock object in cache is same instance as 'lock' // then remove it from cache. If not, some other thread is succeeded in // putting new lock object and hence we can leave the removal of lock object to that // thread. caches.computeIfPresent(key, (k, current) -> lock == current ? null : current); } private ReentrantLock findAndGetLock(String key) { // Merge method given us the access to the previously( if available) and // newer lock object together. return caches.merge(key, new ReentrantLock(), (older, newer) -> nonNull(older) ? older : newer); } 

您可以尝试使用JKeyLockManager之类的东西,而不是自己编写 。 从项目描述:

JKeyLockManager使用特定于应用程序的密钥提供细粒度锁定。

网站上给出的示例代码:

 public class WeatherServiceProxy { private final KeyLockManager lockManager = KeyLockManagers.newManager(); public void updateWeatherData(String cityName, float temperature) { lockManager.executeLocked(cityName, () -> delegate.updateWeatherData(cityName, temperature)); } 

您致电时将添加新值

 lockMap.computeIfAbsent() 

因此,您只需检查lockMap.size()项目计数即可。

但是你如何找到第一个添加的项目? 最好只是在使用它们后删除它们。

您可以使用存储对象引用的进程内缓存,如Caffeine,Guava,EHCache或cache2k。 以下是如何使用cache2k构建缓存的示例 :

 final Cache locks = new Cache2kBuilder(){} .loader( new CacheLoader() { @Override public Lock load(Key o) { return new ReentrantLock(); } } ) .storeByReference(true) .entryCapacity(1000) .build(); 

使用模式与问题中的一样:

  Lock keyLock = locks.get(key); keyLock.lock(); try { externalSystem.process(key); } finally { keyLock.unlock(); } 

由于缓存限制为1000个条目,因此可以自动清除不再使用的锁。

如果应用程序中的容量和线程数不匹配,则可能会由缓存驱逐使用中的锁。 该解决方案在我们的应用中可以使用多年。 当存在足够长的运行任务且超出容量时,缓存将驱逐正在使用的锁。 在实际应用程序中,您始终可以控制生命线程的数量,例如在Web容器中,您可以将处理线程的数量限制为(示例)100。因此,您知道在使用中永远不会有超过100个锁。 如果考虑到这一点,该解决方案具有最小的开销。

请记住,只要应用程序在单个VM上运行,锁定就会起作用。 您可能需要查看分布式锁定管理器(DLM)。 提供分布式锁的产品示例:淡褐色,无影,赤土,红色/红色。