1

我正在尝试为我的 spring webflux 应用程序创建一个咖啡因缓存。

这是实现:

private final Cache<String, OfferData> localCache = Caffeine.newBuilder()
            .expireAfterWrite(1, TimeUnit.MINUTES)
            .initialCapacity(1)
            .removalListener((RemovalListener<String, OfferData>) (key, value, cause) -> {
                log.debug(">> Caffeine Cache => Key: " + key + " | Value:" + value + " | Cause:" + cause);
            })
            .recordStats()
            .build();

这里需要注意的是,我不能在refreshAfterWrite这里使用该物业。它仅适用于加载缓存,我不能使用加载缓存,因为加载器函数返回一个 Mono。

加载器功能如下:

@Retryable(value = {RuntimeException.class}, maxAttempts = 2, backoff = @Backoff(delayExpression = "${service.offer.local-cache-load-retry-delay}"))
    private Mono<OfferData> loadLocalCache() {
        log.info(LOG_LOAD_LOCAL_CACHE + "Re-loading local offer cache...");
        log.info(LOG_LOAD_LOCAL_CACHE + "Fetching offers from external cache...");

        return fetchOffersFromExternalCache()
                .doOnNext(offerData -> log.info(LOG_LOAD_LOCAL_CACHE + "Ads fetched successfully from external cache."))
                .onErrorResume(throwable -> {
                    // Failed to load from the external cache. No data present in the cache.
                    // Loading the data from Ads Manager to the cache
                    log.error(LOG_LOAD_LOCAL_CACHE + "No offer data could be fetched from external cache: " + throwable);
                    log.info(LOG_LOAD_LOCAL_CACHE + "Fetching offers from source...");

                    // If data cannot be retrieved from the source, we do not populate the internal cache with anything
                    // for that key. We leave it blank for the next call to try and populate it.
                    return fetchOffersFromSource()
                            .doOnNext(offerData -> storeOffersInExternalCache(offerData, offerServiceProperties.getExternalCacheExpiration()))
                            .doOnError(sourceThrowable -> log.error(LOG_LOAD_LOCAL_CACHE + "Offer data not available at source: {}", sourceThrowable.getMessage()));
                });
    }

我从 Mono 中提取值然后将其存储在缓存中的实现:

public Mono<List<RuntimeOfferDetails>> getOffers(String eventName) {
        return CacheMono.lookup(
                        k -> Mono.justOrEmpty(localCache.getIfPresent(k)).map(Signal::next),
                        OFFER_DATA_KEY
                ).onCacheMissResume(this::loadLocalCache)
                .andWriteWith((k, sig) -> Mono.fromRunnable(() -> localCache.put(k, Objects.requireNonNull(sig.get()))))
                .map(offerData -> getOfferDetails(offerData, eventName))
                .map(offerDetailsList -> offerDetailsList.stream()
                        .map(RuntimeOfferDetails::toRuntimeOfferDetails)
                        .collect(Collectors.toList())
                ).doOnError(throwable -> {
                    log.error(LOG_GET_OFFERS + "No offer data found in the local cache neither could it be retrieved from external cache or source: {}", throwable.getMessage());
                    meterRegistry.counter(METRIC_FETCH_RUNTIME_OFFER_DETAILS, TAG_OUTCOME, FAILURE).increment();
                }).doOnSuccess(runtimeOfferDetails -> {
                    log.info(LOG_GET_OFFERS + "Runtime offer details retrieved successfully.");
                    log.debug(LOG_GET_OFFERS + "Runtime offer details: [{}]", runtimeOfferDetails);
                    meterRegistry.counter(METRIC_FETCH_RUNTIME_OFFER_DETAILS, TAG_OUTCOME, SUCCESS).increment();
                }).switchIfEmpty(Mono.error(new OfferServiceException("No offer data found for the event: " + eventName)));
    }

我可以通过使用该expireAfterWrite属性来控制缓存,但同样,它只会从内存缓存中逐出键。我想要与refreshAfterWrite加载缓存以外的缓存相同的控件。我该如何实施?

4

0 回答 0