我正在尝试为我的 spring webflux 应用程序创建一个咖啡因缓存。
这是实现:
private final Cache<String, OfferData> localCache = Caffeine.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.initialCapacity(1)
.removalListener((RemovalListener<String, OfferData>) (key, value, cause) -> {
log.debug(">> Caffeine Cache => Key: " + key + " | Value:" + value + " | Cause:" + cause);
})
.recordStats()
.build();
这里需要注意的是,我不能在refreshAfterWrite
这里使用该物业。它仅适用于加载缓存,我不能使用加载缓存,因为加载器函数返回一个 Mono。
加载器功能如下:
@Retryable(value = {RuntimeException.class}, maxAttempts = 2, backoff = @Backoff(delayExpression = "${service.offer.local-cache-load-retry-delay}"))
private Mono<OfferData> loadLocalCache() {
log.info(LOG_LOAD_LOCAL_CACHE + "Re-loading local offer cache...");
log.info(LOG_LOAD_LOCAL_CACHE + "Fetching offers from external cache...");
return fetchOffersFromExternalCache()
.doOnNext(offerData -> log.info(LOG_LOAD_LOCAL_CACHE + "Ads fetched successfully from external cache."))
.onErrorResume(throwable -> {
// Failed to load from the external cache. No data present in the cache.
// Loading the data from Ads Manager to the cache
log.error(LOG_LOAD_LOCAL_CACHE + "No offer data could be fetched from external cache: " + throwable);
log.info(LOG_LOAD_LOCAL_CACHE + "Fetching offers from source...");
// If data cannot be retrieved from the source, we do not populate the internal cache with anything
// for that key. We leave it blank for the next call to try and populate it.
return fetchOffersFromSource()
.doOnNext(offerData -> storeOffersInExternalCache(offerData, offerServiceProperties.getExternalCacheExpiration()))
.doOnError(sourceThrowable -> log.error(LOG_LOAD_LOCAL_CACHE + "Offer data not available at source: {}", sourceThrowable.getMessage()));
});
}
我从 Mono 中提取值然后将其存储在缓存中的实现:
public Mono<List<RuntimeOfferDetails>> getOffers(String eventName) {
return CacheMono.lookup(
k -> Mono.justOrEmpty(localCache.getIfPresent(k)).map(Signal::next),
OFFER_DATA_KEY
).onCacheMissResume(this::loadLocalCache)
.andWriteWith((k, sig) -> Mono.fromRunnable(() -> localCache.put(k, Objects.requireNonNull(sig.get()))))
.map(offerData -> getOfferDetails(offerData, eventName))
.map(offerDetailsList -> offerDetailsList.stream()
.map(RuntimeOfferDetails::toRuntimeOfferDetails)
.collect(Collectors.toList())
).doOnError(throwable -> {
log.error(LOG_GET_OFFERS + "No offer data found in the local cache neither could it be retrieved from external cache or source: {}", throwable.getMessage());
meterRegistry.counter(METRIC_FETCH_RUNTIME_OFFER_DETAILS, TAG_OUTCOME, FAILURE).increment();
}).doOnSuccess(runtimeOfferDetails -> {
log.info(LOG_GET_OFFERS + "Runtime offer details retrieved successfully.");
log.debug(LOG_GET_OFFERS + "Runtime offer details: [{}]", runtimeOfferDetails);
meterRegistry.counter(METRIC_FETCH_RUNTIME_OFFER_DETAILS, TAG_OUTCOME, SUCCESS).increment();
}).switchIfEmpty(Mono.error(new OfferServiceException("No offer data found for the event: " + eventName)));
}
我可以通过使用该expireAfterWrite
属性来控制缓存,但同样,它只会从内存缓存中逐出键。我想要与refreshAfterWrite
加载缓存以外的缓存相同的控件。我该如何实施?