Java中是否存在一个到期的映射,它会在* first * insertion之后的一段时间后到期?

我试着看一下缓存机制,比如Guava的Cache 。 它们的到期时间仅为自上次更新以来。

我正在寻找的是一个数据结构,它存储密钥并在第一次插入后经过一段时间后清理密钥。 我计划将价值作为一些反击。

一个场景可能是一个沉默的工作者,它第一次做了一些工作但是在一段有效期内保持沉默 – 即使工作被要求。 如果在到期时间之后要求工作,他将完成工作。

知道这样的数据结构吗? 谢谢。

有几种选择。

被动清除

如果不需要在过期的密钥到期或按设定的时间间隔清理过期的密钥(即密钥在密钥到期时或在某个设置的时间间隔内不需要删除密钥),那么来自Apache Commons Collections的PassiveExpiringMap就是一个不错的选择。 尝试访问此映射中的密钥时,将检查密钥的生存时间(TTL)(所有密钥具有相同的TTL),如果密钥已过期,则会从映射中删除该密钥并返回null 。 此数据结构没有主动清理机制,因此只有在与密钥对应的TTL过期后访问它们时,才会删除过期的条目。

高速缓存

如果需要更多基于缓存的function(例如最大缓存容量和添加/删除侦听),Google Guava会提供CacheBuilder类。 这个类比Apache Commons替代方案更复杂,但它也提供了更多function。 如果这是为了更多基于缓存的应用程序,那么权衡可能是值得的。

螺纹拆卸

如果需要主动删除过期密钥,则可以生成一个单独的线程,负责删除过期的密钥。 在研究可能的实施结构之前,应该注意的是,这种方法的效果可能不如上述替代方案。 除了启动线程的开销之外,线程可能会导致与访问映射的客户端发生用。 例如,如果客户端想要访问密钥并且清理线程当前正在删除过期密钥,则客户端将阻止(如果使用同步)或具有不同的映射视图(包含哪些键值对)在地图中)如果采用某种并发机制。

话虽如此,使用这种方法很复杂,因为它要求TTL与密钥一起存储。 一种方法是创建一个ExpiringKey ,例如(每个键可以有自己的TTL;即使所有键最终都具有相同的TTL值,这种技术也不需要创建Map 装饰器或其他一些实现Map界面):

 public class ExpiringKey { private final T key; private final long expirationTimestamp; public ExpiringKey(T key, long ttlInMillis) { this.key = key; expirationTimestamp = System.currentTimeMillis() + ttlInMillis; } public T getKey() { return key; } public boolean isExpired() { return System.currentTimeMillis() > expirationTimestamp; } } 

现在地图的类型是Map, V>带有一些特定的KV类型值。 后台线程可以使用类似于以下内容的Runnable表示:

 public class ExpiredKeyRemover implements Runnable { private final Map, ?> map; public ExpiredKeyRemover(Map, ?> map) { this.map = map; } @Override public void run() { Iterator> it = map.keySet().iterator(); while (it.hasNext()) { if (it.next().isExpired()) { it.remove(); } } } } 

然后可以启动Runnable以便使用ScheduledExecutorService以固定间隔执行,如下所示(这将每隔5秒清理一次映射):

 Map, V> myMap = // ... ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); executor.scheduleAtFixedRate(new ExpiredKeyRemover(myMap), 0, 5, TimeUnit.SECONDS); 

值得注意的是,必须同步用于myMapMap实现或允许并发访问。 并发Map实现的挑战是ExpiredKeyRemover可能会看到与客户端不同的映射视图 ,如果清理线程未完成删除其他键(即使已删除),则可能会将过期密钥返回给客户端期望/过期的密钥,因为它的更改可能尚未提交)。 另外,上面的密钥删除代码可以使用流来实现,但是上面的代码仅用于说明逻辑而不是提供高性能的实现。

希望有所帮助。

创建了一个数据结构。 称之为DuplicateActionFilterByInsertTime

正确的想法是过滤重复的消息。 以下类过滤一段时间的插入时间( filterMillis )。

执行:

 public class DuplicateActionFilterByInsertTime { private static final Logger LOGGER = Logger.getLogger(DuplicateActionFilterByInsertTime.class.getName()); private final long filterMillis; private final ConcurrentHashMap actionMap = new ConcurrentHashMap<>(); private final ConcurrentLinkedQueue actionQueue = new ConcurrentLinkedQueue<>(); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private final AtomicBoolean purgerRegistered = new AtomicBoolean(false); private final Set> listeners = ConcurrentHashMap.newKeySet(); public DuplicateActionFilterByInsertTime(int filterMillis) { this.filterMillis = filterMillis; } public SilenceInfo get(E e) { SilenceInfoImpl insertionData = actionMap.get(e); if (insertionData == null || insertionData.isExpired(filterMillis)) { return null; } return insertionData; } public boolean run(E e) { actionMap.computeIfPresent(e, (e1, insertionData) -> { int count = insertionData.incrementAndGet(); if (count == 2) { notifyFilteringStarted(e1); } return insertionData; }); boolean isNew = actionMap.computeIfAbsent(e, e1 -> { SilenceInfoImpl insertionData = new SilenceInfoImpl(); actionQueue.add(e1); return insertionData; }).getCount() == 1; tryRegisterPurger(); if (isNew) { e.run(); } return isNew; } private void tryRegisterPurger() { if (actionMap.size() != 0 && purgerRegistered.compareAndSet(false, true)) { scheduledExecutorService.schedule(() -> { try { for (Iterator iterator = actionQueue.iterator(); iterator.hasNext(); ) { E e = iterator.next(); SilenceInfoImpl insertionData = actionMap.get(e); if (insertionData == null || insertionData.isExpired(filterMillis)) { iterator.remove(); } if (insertionData != null && insertionData.isExpired(filterMillis)) { SilenceInfoImpl removed = actionMap.remove(e); FilteredItem filteredItem = new FilteredItem<>(e, removed); notifySilenceFinished(filteredItem); } else { // All the elements that were left shouldn't be purged. break; } } } finally { purgerRegistered.set(false); tryRegisterPurger(); } }, filterMillis, TimeUnit.MILLISECONDS); } } private void notifySilenceFinished(FilteredItem filteredItem) { new Thread(() -> listeners.forEach(l -> { try { l.onFilteringFinished(filteredItem); } catch (Exception e) { LOGGER.log(Level.WARNING, "Purge notification failed. Continuing to next one (if exists)", e); } })).start(); } private void notifyFilteringStarted(final E e) { new Thread(() -> listeners.forEach(l -> { try { l.onFilteringStarted(e); } catch (Exception e1) { LOGGER.log(Level.WARNING, "Silence started notification failed. Continuing to next one (if exists)", e1); } })).start(); } public void addListener(Listener listener) { listeners.add(listener); } public void removeLister(Listener listener) { listeners.remove(listener); } public interface SilenceInfo { long getInsertTimeMillis(); int getCount(); } public interface Listener { void onFilteringStarted(E e); void onFilteringFinished(FilteredItem filteredItem); } private static class SilenceInfoImpl implements SilenceInfo { private final long insertTimeMillis = System.currentTimeMillis(); private AtomicInteger count = new AtomicInteger(1); int incrementAndGet() { return count.incrementAndGet(); } @Override public long getInsertTimeMillis() { return insertTimeMillis; } @Override public int getCount() { return count.get(); } boolean isExpired(long expirationMillis) { return insertTimeMillis + expirationMillis < System.currentTimeMillis(); } } public static class FilteredItem { private final E item; private final SilenceInfo silenceInfo; FilteredItem(E item, SilenceInfo silenceInfo) { this.item = item; this.silenceInfo = silenceInfo; } public E getItem() { return item; } public SilenceInfo getSilenceInfo() { return silenceInfo; } } } 

测试示例:( 此处有更多测试)

 @Test public void testSimple() throws InterruptedException { int filterMillis = 100; DuplicateActionFilterByInsertTime expSet = new DuplicateActionFilterByInsertTime<>(filterMillis); AtomicInteger purgeCount = new AtomicInteger(0); expSet.addListener(new DuplicateActionFilterByInsertTime.Listener() { @Override public void onFilteringFinished(DuplicateActionFilterByInsertTime.FilteredItem filteredItem) { purgeCount.incrementAndGet(); } @Override public void onFilteringStarted(Runnable runnable) { } }); Runnable key = () -> { }; long beforeAddMillis = System.currentTimeMillis(); boolean added = expSet.run(key); long afterAddMillis = System.currentTimeMillis(); Assert.assertTrue(added); DuplicateActionFilterByInsertTime.SilenceInfo silenceInfo = expSet.get(key); Assertions.assertThat(silenceInfo.getInsertTimeMillis()).isBetween(beforeAddMillis, afterAddMillis); expSet.run(key); DuplicateActionFilterByInsertTime.SilenceInfo silenceInfo2 = expSet.get(key); Assert.assertEquals(silenceInfo.getInsertTimeMillis(), silenceInfo2.getInsertTimeMillis()); Assert.assertFalse(silenceInfo.getInsertTimeMillis() + filterMillis < System.currentTimeMillis()); Assert.assertEquals(silenceInfo.getCount(), 2); Thread.sleep(filterMillis); Assertions.assertThat(expSet.get(key)).isNull(); Assert.assertNull(expSet.get(key)); Thread.sleep(filterMillis * 2); // Give a chance to purge the items. Assert.assertEquals(1, purgeCount.get()); System.out.println("Finished"); } 

来源 。

Interesting Posts