5

我尝试查看缓存机制,例如 Guava 的Cache. 它们的到期时间仅在上次更新后。

我正在寻找的是一种数据结构,它存储密钥并在第一次插入后经过一段时间后清理密钥。我计划将值设为一些计数器。

一个场景可能是一个沉默的工人,他第一次做一些工作,但在到期的一段时间内保持沉默——即使请求工作。如果在到期时间过后要求工作,他将完成工作。

知道这样的数据结构吗?谢谢。

4

3 回答 3

8

有几个选择。

被动移除

如果不需要在过期密钥过期时或按设定的时间间隔清理过期密钥(即,当密钥过期或在某个设定的时间间隔内不需要删除密钥),那么来自 Apache Commons Collections 的PassiveExpiringMap是一个不错的选择。当尝试访问此映射中的键时,将检查键的生存时间 (TTL)(所有键具有相同的 TTL),如果键已过期,则将其从映射中删除并null返回。这种数据结构没有主动清理机制,所以只有在key对应的TTL过期后才被访问,才会删除过期条目。

缓存

如果需要更多基于缓存的功能(例如最大缓存容量和添加/删除监听),Google Guava 提供了CacheBuilder类。这个类比 Apache Commons 替代品更复杂,但它也提供了更多的功能。如果这用于更多基于缓存的应用程序,则权衡可能是值得的。

螺纹拆卸

如果需要主动删除过期密钥,则可以生成一个单独的线程来负责删除过期密钥。在查看可能的实现结构之前,应该注意这种方法的性能可能不如上述替代方法。除了启动线程的开销外,线程可能会导致与访问地图的客户端发生争用。例如,如果客户端想要访问一个键并且清理线程当前正在删除过期的键,客户端将阻塞(如果使用同步)或拥有不同的映射视图(包含哪些键值对在地图中)如果采用了一些并发机制。

话虽如此,使用这种方法很复杂,因为它需要将 TTL 与密钥一起存储。一种方法是创建一个ExpiringKey,例如(每个键都可以有自己的 TTL;即使所有键最终都具有相同的 TTL 值,这种技术也不需要创建Map 装饰器Map接口的其他一些实现) :

public class ExpiringKey<T> {

    private final T key;
    private final long expirationTimestamp;

    public ExpiringKey(T key, long ttlInMillis) {
        this.key = key;
        expirationTimestamp = System.currentTimeMillis() + ttlInMillis;
    }

    public T getKey() {
        return key;
    }

    public boolean isExpired() {
        return System.currentTimeMillis() > expirationTimestamp;
    }
}

现在地图的类型将Map<ExpiringKey<K>, V>具有一些特定KV类型值。后台线程可以使用Runnable类似于以下内容的 a 来表示:

public class ExpiredKeyRemover implements Runnable {

    private final Map<ExpiringKey<?>, ?> map;

    public ExpiredKeyRemover(Map<ExpiringKey<?>, ?> map) {
        this.map = map;
    }

    @Override
    public void run() {
        Iterator<ExpiringKey<?>> it = map.keySet().iterator();
        while (it.hasNext()) {
            if (it.next().isExpired()) {
                it.remove();
            }
        }
    }
}

然后Runnable可以启动它,使其以固定间隔执行,ScheduledExecutorService如下所示(每 5 秒清理一次地图):

Map<ExpiringKey<K>, V> myMap = // ...

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(new ExpiredKeyRemover(myMap), 0, 5, TimeUnit.SECONDS);

需要注意的是,Map用于的实现myMap必须同步或允许并发访问。并发Map实现ExpiredKeyRemover的挑战在于,如果清理线程未完成删除其他键(即使它已删除所需的/expired 键,因为它的更改可能尚未提交)。此外,上面的密钥删除代码可以使用流来实现,但上面的代码只是用来说明逻辑,而不是提供高性能的实现。

希望有帮助。

于 2018-03-29T17:00:06.253 回答
2

您可以使用ExpiringMap。这将在初始化 Map 时指定时间后从地图中删除元素。这是语法

public static Map<String, Long> threatURLCacheMap = ExpiringMap.builder().expiration(5, TimeUnit.MINUTES).build();

这将创建一个 Map ,其中每个元素将在插入 5 分钟后过期。您可以将此依赖项用于您的 maven 项目 net.jodah.expiringmap。这是了解更多信息的链接 https://crunchify.com/how-to-use-expiringmap-maven-java-utility-to-remove-expired-objects-from-map-automatically-complete-java-tutorial/

于 2019-09-23T08:31:48.533 回答
0

创建了一个数据结构。叫它DuplicateActionFilterByInsertTime

正确的概念是过滤重复消息。以下类从插入时间过滤一段时间(filterMillis)。

执行:

public class DuplicateActionFilterByInsertTime<E extends Runnable> {

    private static final Logger LOGGER = Logger.getLogger(DuplicateActionFilterByInsertTime.class.getName());

    private final long filterMillis;

    private final ConcurrentHashMap<E, SilenceInfoImpl> actionMap = new ConcurrentHashMap<>();

    private final ConcurrentLinkedQueue<E> actionQueue = new ConcurrentLinkedQueue<>();

    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

    private final AtomicBoolean purgerRegistered = new AtomicBoolean(false);

    private final Set<Listener<E>> listeners = ConcurrentHashMap.newKeySet();

    public DuplicateActionFilterByInsertTime(int filterMillis) {
        this.filterMillis = filterMillis;
    }

    public SilenceInfo get(E e) {
        SilenceInfoImpl insertionData = actionMap.get(e);
        if (insertionData == null || insertionData.isExpired(filterMillis)) {
            return null;
        }
        return insertionData;
    }

    public boolean run(E e) {
        actionMap.computeIfPresent(e, (e1, insertionData) -> {
            int count = insertionData.incrementAndGet();
            if (count == 2) {
                notifyFilteringStarted(e1);
            }
            return insertionData;
        });
        boolean isNew = actionMap.computeIfAbsent(e, e1 -> {
            SilenceInfoImpl insertionData = new SilenceInfoImpl();
            actionQueue.add(e1);
            return insertionData;
        }).getCount() == 1;

        tryRegisterPurger();

        if (isNew) {
            e.run();
        }
        return isNew;
    }

    private void tryRegisterPurger() {
        if (actionMap.size() != 0 && purgerRegistered.compareAndSet(false, true)) {
            scheduledExecutorService.schedule(() -> {
                try {
                    for (Iterator<E> iterator = actionQueue.iterator(); iterator.hasNext(); ) {
                        E e = iterator.next();
                        SilenceInfoImpl insertionData = actionMap.get(e);
                        if (insertionData == null || insertionData.isExpired(filterMillis)) {
                            iterator.remove();
                        }
                        if (insertionData != null && insertionData.isExpired(filterMillis)) {
                            SilenceInfoImpl removed = actionMap.remove(e);
                            FilteredItem<E> filteredItem = new FilteredItem<>(e, removed);
                            notifySilenceFinished(filteredItem);
                        } else {
                            // All the elements that were left shouldn't be purged.
                            break;
                        }
                    }
                } finally {
                    purgerRegistered.set(false);
                    tryRegisterPurger();
                }
            }, filterMillis, TimeUnit.MILLISECONDS);
        }
    }

    private void notifySilenceFinished(FilteredItem<E> filteredItem) {
        new Thread(() -> listeners.forEach(l -> {
            try {
                l.onFilteringFinished(filteredItem);
            } catch (Exception e) {
                LOGGER.log(Level.WARNING, "Purge notification failed. Continuing to next one (if exists)", e);
            }
        })).start();
    }

    private void notifyFilteringStarted(final E e) {
        new Thread(() -> listeners.forEach(l -> {
            try {
                l.onFilteringStarted(e);
            } catch (Exception e1) {
                LOGGER.log(Level.WARNING, "Silence started notification failed. Continuing to next one (if exists)", e1);
            }
        })).start();
    }

    public void addListener(Listener<E> listener) {
        listeners.add(listener);
    }

    public void removeLister(Listener<E> listener) {
        listeners.remove(listener);
    }

    public interface SilenceInfo {
        long getInsertTimeMillis();

        int getCount();
    }

    public interface Listener<E> {
        void onFilteringStarted(E e);
        void onFilteringFinished(FilteredItem<E> filteredItem);
    }

    private static class SilenceInfoImpl implements SilenceInfo {
        private final long insertTimeMillis = System.currentTimeMillis();
        private AtomicInteger count = new AtomicInteger(1);

        int incrementAndGet() {
            return count.incrementAndGet();
        }

        @Override
        public long getInsertTimeMillis() {
            return insertTimeMillis;
        }

        @Override
        public int getCount() {
            return count.get();
        }

        boolean isExpired(long expirationMillis) {
            return insertTimeMillis + expirationMillis < System.currentTimeMillis();
        }
    }

    public static class FilteredItem<E> {
        private final E item;
        private final SilenceInfo silenceInfo;

        FilteredItem(E item, SilenceInfo silenceInfo) {
            this.item = item;
            this.silenceInfo = silenceInfo;
        }

        public E getItem() {
            return item;
        }

        public SilenceInfo getSilenceInfo() {
            return silenceInfo;
        }
    }
}

测试示例:(更多测试在这里

@Test
public void testSimple() throws InterruptedException {
    int filterMillis = 100;
    DuplicateActionFilterByInsertTime<Runnable> expSet = new DuplicateActionFilterByInsertTime<>(filterMillis);
    AtomicInteger purgeCount = new AtomicInteger(0);
    expSet.addListener(new DuplicateActionFilterByInsertTime.Listener<Runnable>() {
        @Override
        public void onFilteringFinished(DuplicateActionFilterByInsertTime.FilteredItem<Runnable> filteredItem) {
            purgeCount.incrementAndGet();
        }

        @Override
        public void onFilteringStarted(Runnable runnable) {
        }
    });

    Runnable key = () -> {
    };
    long beforeAddMillis = System.currentTimeMillis();
    boolean added = expSet.run(key);
    long afterAddMillis = System.currentTimeMillis();
    Assert.assertTrue(added);
    DuplicateActionFilterByInsertTime.SilenceInfo silenceInfo = expSet.get(key);
    Assertions.assertThat(silenceInfo.getInsertTimeMillis()).isBetween(beforeAddMillis, afterAddMillis);

    expSet.run(key);
    DuplicateActionFilterByInsertTime.SilenceInfo silenceInfo2 = expSet.get(key);
    Assert.assertEquals(silenceInfo.getInsertTimeMillis(), silenceInfo2.getInsertTimeMillis());

    Assert.assertFalse(silenceInfo.getInsertTimeMillis() + filterMillis < System.currentTimeMillis());
    Assert.assertEquals(silenceInfo.getCount(), 2);

    Thread.sleep(filterMillis);

    Assertions.assertThat(expSet.get(key)).isNull();

    Assert.assertNull(expSet.get(key));

    Thread.sleep(filterMillis * 2); // Give a chance to purge the items.
    Assert.assertEquals(1, purgeCount.get());

    System.out.println("Finished");
}

来源

于 2018-04-01T12:59:30.780 回答