我尝试查看缓存机制,例如 Guava 的Cache
. 它们的到期时间仅在上次更新后。
我正在寻找的是一种数据结构,它存储密钥并在第一次插入后经过一段时间后清理密钥。我计划将值设为一些计数器。
一个场景可能是一个沉默的工人,他第一次做一些工作,但在到期的一段时间内保持沉默——即使请求工作。如果在到期时间过后要求工作,他将完成工作。
知道这样的数据结构吗?谢谢。
我尝试查看缓存机制,例如 Guava 的Cache
. 它们的到期时间仅在上次更新后。
我正在寻找的是一种数据结构,它存储密钥并在第一次插入后经过一段时间后清理密钥。我计划将值设为一些计数器。
一个场景可能是一个沉默的工人,他第一次做一些工作,但在到期的一段时间内保持沉默——即使请求工作。如果在到期时间过后要求工作,他将完成工作。
知道这样的数据结构吗?谢谢。
有几个选择。
被动移除
如果不需要在过期密钥过期时或按设定的时间间隔清理过期密钥(即,当密钥过期或在某个设定的时间间隔内不需要删除密钥),那么来自 Apache Commons Collections 的PassiveExpiringMap是一个不错的选择。当尝试访问此映射中的键时,将检查键的生存时间 (TTL)(所有键具有相同的 TTL),如果键已过期,则将其从映射中删除并null
返回。这种数据结构没有主动清理机制,所以只有在key对应的TTL过期后才被访问,才会删除过期条目。
缓存
如果需要更多基于缓存的功能(例如最大缓存容量和添加/删除监听),Google Guava 提供了CacheBuilder类。这个类比 Apache Commons 替代品更复杂,但它也提供了更多的功能。如果这用于更多基于缓存的应用程序,则权衡可能是值得的。
螺纹拆卸
如果需要主动删除过期密钥,则可以生成一个单独的线程来负责删除过期密钥。在查看可能的实现结构之前,应该注意这种方法的性能可能不如上述替代方法。除了启动线程的开销外,线程可能会导致与访问地图的客户端发生争用。例如,如果客户端想要访问一个键并且清理线程当前正在删除过期的键,客户端将阻塞(如果使用同步)或拥有不同的映射视图(包含哪些键值对在地图中)如果采用了一些并发机制。
话虽如此,使用这种方法很复杂,因为它需要将 TTL 与密钥一起存储。一种方法是创建一个ExpiringKey
,例如(每个键都可以有自己的 TTL;即使所有键最终都具有相同的 TTL 值,这种技术也不需要创建Map
装饰器或Map
接口的其他一些实现) :
public class ExpiringKey<T> {
private final T key;
private final long expirationTimestamp;
public ExpiringKey(T key, long ttlInMillis) {
this.key = key;
expirationTimestamp = System.currentTimeMillis() + ttlInMillis;
}
public T getKey() {
return key;
}
public boolean isExpired() {
return System.currentTimeMillis() > expirationTimestamp;
}
}
现在地图的类型将Map<ExpiringKey<K>, V>
具有一些特定K
的V
类型值。后台线程可以使用Runnable
类似于以下内容的 a 来表示:
public class ExpiredKeyRemover implements Runnable {
private final Map<ExpiringKey<?>, ?> map;
public ExpiredKeyRemover(Map<ExpiringKey<?>, ?> map) {
this.map = map;
}
@Override
public void run() {
Iterator<ExpiringKey<?>> it = map.keySet().iterator();
while (it.hasNext()) {
if (it.next().isExpired()) {
it.remove();
}
}
}
}
然后Runnable
可以启动它,使其以固定间隔执行,ScheduledExecutorService
如下所示(每 5 秒清理一次地图):
Map<ExpiringKey<K>, V> myMap = // ...
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(new ExpiredKeyRemover(myMap), 0, 5, TimeUnit.SECONDS);
需要注意的是,Map
用于的实现myMap
必须同步或允许并发访问。并发Map
实现ExpiredKeyRemover
的挑战在于,如果清理线程未完成删除其他键(即使它已删除所需的/expired 键,因为它的更改可能尚未提交)。此外,上面的密钥删除代码可以使用流来实现,但上面的代码只是用来说明逻辑,而不是提供高性能的实现。
希望有帮助。
您可以使用ExpiringMap。这将在初始化 Map 时指定时间后从地图中删除元素。这是语法
public static Map<String, Long> threatURLCacheMap = ExpiringMap.builder().expiration(5, TimeUnit.MINUTES).build();
这将创建一个 Map ,其中每个元素将在插入 5 分钟后过期。您可以将此依赖项用于您的 maven 项目 net.jodah.expiringmap。这是了解更多信息的链接 https://crunchify.com/how-to-use-expiringmap-maven-java-utility-to-remove-expired-objects-from-map-automatically-complete-java-tutorial/
创建了一个数据结构。叫它DuplicateActionFilterByInsertTime
。
正确的概念是过滤重复消息。以下类从插入时间过滤一段时间(filterMillis
)。
执行:
public class DuplicateActionFilterByInsertTime<E extends Runnable> {
private static final Logger LOGGER = Logger.getLogger(DuplicateActionFilterByInsertTime.class.getName());
private final long filterMillis;
private final ConcurrentHashMap<E, SilenceInfoImpl> actionMap = new ConcurrentHashMap<>();
private final ConcurrentLinkedQueue<E> actionQueue = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final AtomicBoolean purgerRegistered = new AtomicBoolean(false);
private final Set<Listener<E>> listeners = ConcurrentHashMap.newKeySet();
public DuplicateActionFilterByInsertTime(int filterMillis) {
this.filterMillis = filterMillis;
}
public SilenceInfo get(E e) {
SilenceInfoImpl insertionData = actionMap.get(e);
if (insertionData == null || insertionData.isExpired(filterMillis)) {
return null;
}
return insertionData;
}
public boolean run(E e) {
actionMap.computeIfPresent(e, (e1, insertionData) -> {
int count = insertionData.incrementAndGet();
if (count == 2) {
notifyFilteringStarted(e1);
}
return insertionData;
});
boolean isNew = actionMap.computeIfAbsent(e, e1 -> {
SilenceInfoImpl insertionData = new SilenceInfoImpl();
actionQueue.add(e1);
return insertionData;
}).getCount() == 1;
tryRegisterPurger();
if (isNew) {
e.run();
}
return isNew;
}
private void tryRegisterPurger() {
if (actionMap.size() != 0 && purgerRegistered.compareAndSet(false, true)) {
scheduledExecutorService.schedule(() -> {
try {
for (Iterator<E> iterator = actionQueue.iterator(); iterator.hasNext(); ) {
E e = iterator.next();
SilenceInfoImpl insertionData = actionMap.get(e);
if (insertionData == null || insertionData.isExpired(filterMillis)) {
iterator.remove();
}
if (insertionData != null && insertionData.isExpired(filterMillis)) {
SilenceInfoImpl removed = actionMap.remove(e);
FilteredItem<E> filteredItem = new FilteredItem<>(e, removed);
notifySilenceFinished(filteredItem);
} else {
// All the elements that were left shouldn't be purged.
break;
}
}
} finally {
purgerRegistered.set(false);
tryRegisterPurger();
}
}, filterMillis, TimeUnit.MILLISECONDS);
}
}
private void notifySilenceFinished(FilteredItem<E> filteredItem) {
new Thread(() -> listeners.forEach(l -> {
try {
l.onFilteringFinished(filteredItem);
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Purge notification failed. Continuing to next one (if exists)", e);
}
})).start();
}
private void notifyFilteringStarted(final E e) {
new Thread(() -> listeners.forEach(l -> {
try {
l.onFilteringStarted(e);
} catch (Exception e1) {
LOGGER.log(Level.WARNING, "Silence started notification failed. Continuing to next one (if exists)", e1);
}
})).start();
}
public void addListener(Listener<E> listener) {
listeners.add(listener);
}
public void removeLister(Listener<E> listener) {
listeners.remove(listener);
}
public interface SilenceInfo {
long getInsertTimeMillis();
int getCount();
}
public interface Listener<E> {
void onFilteringStarted(E e);
void onFilteringFinished(FilteredItem<E> filteredItem);
}
private static class SilenceInfoImpl implements SilenceInfo {
private final long insertTimeMillis = System.currentTimeMillis();
private AtomicInteger count = new AtomicInteger(1);
int incrementAndGet() {
return count.incrementAndGet();
}
@Override
public long getInsertTimeMillis() {
return insertTimeMillis;
}
@Override
public int getCount() {
return count.get();
}
boolean isExpired(long expirationMillis) {
return insertTimeMillis + expirationMillis < System.currentTimeMillis();
}
}
public static class FilteredItem<E> {
private final E item;
private final SilenceInfo silenceInfo;
FilteredItem(E item, SilenceInfo silenceInfo) {
this.item = item;
this.silenceInfo = silenceInfo;
}
public E getItem() {
return item;
}
public SilenceInfo getSilenceInfo() {
return silenceInfo;
}
}
}
测试示例:(更多测试在这里)
@Test
public void testSimple() throws InterruptedException {
int filterMillis = 100;
DuplicateActionFilterByInsertTime<Runnable> expSet = new DuplicateActionFilterByInsertTime<>(filterMillis);
AtomicInteger purgeCount = new AtomicInteger(0);
expSet.addListener(new DuplicateActionFilterByInsertTime.Listener<Runnable>() {
@Override
public void onFilteringFinished(DuplicateActionFilterByInsertTime.FilteredItem<Runnable> filteredItem) {
purgeCount.incrementAndGet();
}
@Override
public void onFilteringStarted(Runnable runnable) {
}
});
Runnable key = () -> {
};
long beforeAddMillis = System.currentTimeMillis();
boolean added = expSet.run(key);
long afterAddMillis = System.currentTimeMillis();
Assert.assertTrue(added);
DuplicateActionFilterByInsertTime.SilenceInfo silenceInfo = expSet.get(key);
Assertions.assertThat(silenceInfo.getInsertTimeMillis()).isBetween(beforeAddMillis, afterAddMillis);
expSet.run(key);
DuplicateActionFilterByInsertTime.SilenceInfo silenceInfo2 = expSet.get(key);
Assert.assertEquals(silenceInfo.getInsertTimeMillis(), silenceInfo2.getInsertTimeMillis());
Assert.assertFalse(silenceInfo.getInsertTimeMillis() + filterMillis < System.currentTimeMillis());
Assert.assertEquals(silenceInfo.getCount(), 2);
Thread.sleep(filterMillis);
Assertions.assertThat(expSet.get(key)).isNull();
Assert.assertNull(expSet.get(key));
Thread.sleep(filterMillis * 2); // Give a chance to purge the items.
Assert.assertEquals(1, purgeCount.get());
System.out.println("Finished");
}
来源。