2

我有一个使用协程的 Kotlin JVM 服务器应用程序,我需要在非阻塞网络调用前面放置一个缓存。我想我可以使用咖啡因AsyncLoadingCache来获得我需要的非阻塞缓存行为。AsyncCacheLoader我需要实现的接口使用CompletableFuture. 同时,我要调用的加载缓存条目的方法是一个suspend函数。

我可以像这样弥合差距:

abstract class SuspendingCacheLoader<K, V>: AsyncCacheLoader<K, V> {
    abstract suspend fun load(key: K): V

    final override fun asyncLoad(key: K, executor: Executor): CompletableFuture<V> {
        return GlobalScope.async(executor.asCoroutineDispatcher()) {
            load(key)
        }.asCompletableFuture()
    }
}

这将在提供的(默认情况下, the )上运行该load函数,从 Caffeine 的角度来看,这是正确的行为。ExecutorForkJoinPool

但是,我知道我应该尽量避免使用 GlobalScope 来启动 coroutines

我考虑让我的SuspendingCacheLoader实现CoroutineScope和管理自己的协程上下文。但CoroutineScope旨在由具有托管生命周期的对象实现。缓存和 都没有AsyncCacheLoader任何生命周期钩子。缓存拥有ExecutorCompletableFuture实例,因此它已经以这种方式控制了加载任务的生命周期。我看不出让协程上下文拥有任务会添加任何东西,而且我担心在缓存停止使用后我将无法正确关闭协程上下文。

编写自己的异步缓存机制非常困难,所以如果可以的话,我想与 Caffeine 实现集成。

是使用GlobalScope正确的方法来实现AsyncCacheLoader,还是有更好的解决方案?

4

5 回答 5

3

经过一番思考,我想出了一个更简单的解决方案,我认为它更习惯使用协程。

该方法通过使用AsyncCache.get(key, mappingFunction)而不是实现AsyncCacheLoader. 但是,按照此处其他一些答案的建议,它忽略了Executor缓存配置为使用的内容。

class SuspendingCache<K, V>(private val asyncCache: AsyncCache<K, V>) {
    suspend fun get(key: K): V = supervisorScope {
        getAsync(key).await()
    }

    private fun CoroutineScope.getAsync(key: K) = asyncCache.get(key) { k, _ ->
        future { 
            loadValue(k) 
        }
    }

    private suspend fun loadValue(key: K): V = TODO("Load the value")
}

请注意,这取决于协程构建kotlinx-coroutines-jdk8futureawait()函数。

我认为忽略Executor可能是正确的选择。正如@Kiskae 指出的那样,缓存将ForkJoinPool默认使用。选择使用它而不是默认的协程调度程序可能没有用。但是,如果我们愿意,可以通过更改getAsync函数来轻松使用它:

private fun CoroutineScope.getAsync(key: K) = asyncCache.get(key) { k, executor ->
    future(executor.asCoroutineDispatcher()) { 
        loadValue(k) 
    }
}

于 2019-03-31T08:37:58.117 回答
3

缓存拥有 Executor 和 CompletableFuture 实例,因此它已经以这种方式控制了加载任务的生命周期。

这不是真的,文档上Caffeine指定它使用用户提供的,Executor或者ForkJoinPool.commonPool()如果没有提供。这意味着没有默认的生命周期。

不管直接调用GlobalScope似乎是错误的解决方案,因为没有理由硬编码选择。只需CoroutineScope通过构造函数提供 a 并GlobalScope用作参数,而您没有要绑定的缓存的显式生命周期。

于 2019-03-20T22:14:50.480 回答
1

建议这样的扩展方法

suspend inline fun <K: Any, V: Any> Caffeine<Any, Any>.suspendingLoadingCache(
    crossinline suspendedLoader: suspend (key: K) -> V
): AsyncLoadingCache<K, V> =
    buildAsync { key, executor: Executor ->
        CoroutineScope(executor.asCoroutineDispatcher()).future {
            suspendedLoader(key)
        }
    }

不推荐GlobalScope,使用CoroutineScope(executor.asCoroutineDispatcher())

future方法在kotlinx-coroutines-jdk8模块中定义

于 2020-09-27T11:23:18.623 回答
0

这是一个简单的解决方案。将 K、V 符号替换为您的类型。

    val cache = Caffeine.newBuilder().buildAsync<K, V> { key: K, _ ->
      val future = CompletableFuture<V>()

      launch {
        val result = someAwaitOperation(key)
        future.complete(result)
      }

      future
    }
于 2020-04-24T08:34:58.773 回答
0

这是我的解决方案:

定义一个扩展函数CoroutineVerticle

fun <K, V> CoroutineVerticle.buildCache(configurator: Caffeine<Any, Any>.() -> Unit = {}, loader: suspend CoroutineScope.(K) -> V) = Caffeine.newBuilder().apply(configurator).buildAsync { key: K, _ ->
    // do not use cache's executor
    future {
        loader(key)
    }
}

在里面创建我们的缓存CoroutineVerticle

val cache : AsyncLoadingCache<String, String> = buildCache({
  maximumSize(10_000)
  expireAfterWrite(10, TimeUnit.MINUTES)
}) { key ->
    // load data and return it
    delay(1000)
    "data for key: $key"
}

使用缓存

suspend fun doSomething() {
    val data = cache.get('key').await()

    val future = cache.get('key2')
    val data2 = future.await()
}
于 2019-06-06T05:01:09.273 回答