19

我正在寻找一种优雅的方式来缓存异步操作的结果。

我首先有一个这样的同步方法:

public String GetStuff(String url)
{
    WebRequest request = WebRequest.Create(url);
    using (var response = request.GetResponse())
    using (var sr = new StreamReader(response.GetResponseStream()))
        return sr.ReadToEnd();
}

然后我让它异步:

public async Task<String> GetStuffAsync(String url)
{
    WebRequest request = WebRequest.Create(url);
    using (var response = await request.GetResponseAsync())
    using (var sr = new StreamReader(response.GetResponseStream()))
        return await sr.ReadToEndAsync();
}

然后我决定我应该缓存结果,所以我不需要经常在外面查询:

ConcurrentDictionary<String, String> _cache = new ConcurrentDictionary<String, String>();

public async Task<String> GetStuffAsync(String url)
{
    return _cache.GetOrAdd(url, await GetStuffInternalAsync(url));
}

private async Task<String> GetStuffInternalAsync(String url)
{
    WebRequest request = WebRequest.Create(url);
    using (var response = await request.GetResponseAsync())
    using (var sr = new StreamReader(response.GetResponseStream()))
        return await sr.ReadToEndAsync();
}

然后我读了一篇关于缓存如何Task<T>更好的文章(或者观看了视频),因为创建它们很昂贵:

ConcurrentDictionary<String, Task<String>> _cache = new ConcurrentDictionary<String, Task<String>>();

public Task<String> GetStuffAsync(String url)
{
    return _cache.GetOrAdd(url, GetStuffInternalAsync(url));
}

private async Task<String> GetStuffInternalAsync(String url)
{
    WebRequest request = WebRequest.Create(url);
    using (var response = await request.GetResponseAsync())
    using (var sr = new StreamReader(response.GetResponseStream()))
        return await sr.ReadToEndAsync();
}

现在的问题是,如果请求失败(例如:HTTP 401),缓存将包含失败Task<String>,我将不得不重置应用程序,因为无法重新发送请求。

有没有一种优雅的方式ConcurrentDictionary<T1,T2>来只缓存成功的任务并且仍然具有原子行为?

4

6 回答 6

18

首先,您的两种方法都是错误的,因为它们不会为您节省任何请求(尽管第二种方法至少可以节省您的时间)。

您的第一个代码(带有 的代码await)执行此操作:

  1. 提出请求。
  2. 等待请求完成。
  3. 如果缓存中已经有结果,则忽略请求的结果。

您的第二个代码删除了第 2 步,因此速度更快,但您仍然会发出许多不必要的请求。

你应该做的是使用需要委托的重载GetOrAdd()

public Task<String> GetStuffAsync(String url)
{
    return _cache.GetOrAdd(url, GetStuffInternalAsync);
}

这并不能完全消除请求被忽略的可能性,但它确实降低了它们的可能性。(为此,您可以尝试取消您知道被忽略的请求,但我认为这不值得付出努力。)


现在到你的实际问题。我认为你应该做的是使用AddOrUpdate()方法。如果该值尚不存在,请添加它。如果它在那里,如果它有故障,请更换它:

public Task<String> GetStuffAsync(String url)
{
    return _cache.AddOrUpdate(
        url, GetStuffInternalAsync, (u, task) =>
        {
            if (task.IsCanceled || task.IsFaulted)
                return GetStuffInternalAsync(u);
            return task;
        });
}
于 2014-02-07T12:26:49.213 回答
7

将这些失败的任务保留为Negative Cache实际上是合理的(并且取决于您的设计和性能,至关重要)。否则,如果 aurl总是失败,则一次又一次地使用它会破坏完全使用缓存的意义。

您需要的是一种不时清除缓存的方法。最简单的方法是使用一个计时器来替换ConcurrentDictionarry实例。更强大的解决方案是构建自己的LruDictionary或类似的东西。

于 2014-02-07T12:44:21.417 回答
3

另一种简单的方法是扩展Lazy<T>为 be AsyncLazy<T>,如下所示:

public class AsyncLazy<T> : Lazy<Task<T>>
{
    public AsyncLazy(Func<Task<T>> taskFactory, LazyThreadSafetyMode mode) :
        base(() => Task.Factory.StartNew(() => taskFactory()).Unwrap(), mode)
    { }

    public TaskAwaiter<T> GetAwaiter() { return Value.GetAwaiter(); }
}

然后你可以这样做:

private readonly ConcurrentDictionary<string, AsyncLazy<string>> _cache
    = new ConcurrentDictionary<string, AsyncLazy<string>>();

public async Task<string> GetStuffAsync(string url)
{
    return await _cache.GetOrAdd(url,
        new AsyncLazy<string>(
            () => GetStuffInternalAsync(url),
            LazyThreadSafetyMode.ExecutionAndPublication));
}
于 2016-01-17T03:33:15.473 回答
1

这是一种缓存异步操作结果的方法,可以保证没有缓存未命中。

在接受的答案中,如果在一个循环中(取决于 SynchronizationContext)或从多个线程多次请求相同的 url,则 Web 请求将继续发送,直到有响应被缓存,此时缓存将开始获取用过的。

下面的方法为每个唯一键创建一个SemaphoreSlim对象。这将防止长时间运行的异步操作为同一个键运行多次,同时允许它同时为不同的键运行。显然,保留 SemaphoreSlim 对象以防止缓存未命中是有开销的,因此根据用例的不同,这样做可能不值得。但是,如果保证没有缓存未命中比这更重要的话。

private readonly ConcurrentDictionary<string, SemaphoreSlim> _keyLocks = new ConcurrentDictionary<string, SemaphoreSlim>();
private readonly ConcurrentDictionary<string, string> _cache = new ConcurrentDictionary<string, string>();

public async Task<string> GetSomethingAsync(string key)
{   
    string value;
    // get the semaphore specific to this key
    var keyLock = _keyLocks.GetOrAdd(key, x => new SemaphoreSlim(1));
    await keyLock.WaitAsync();
    try
    {
        // try to get value from cache
        if (!_cache.TryGetValue(key, out value))
        {
            // if value isn't cached, get it the long way asynchronously
            value = await GetSomethingTheLongWayAsync();

            // cache value
            _cache.TryAdd(key, value);
        }
    }
    finally
    {
        keyLock.Release();
    }
    return value;
}

编辑:正如评论中提到的@mtkachenko,可以在此方法开始时执行额外的缓存检查,以可能跳过锁定获取步骤。

于 2016-01-17T02:08:45.637 回答
1

我已经为 MemoryCache 制作了一个包装器,它基本上可以缓存Lazy<Task<T>>对象并工作,以便解决以下所有问题:

  • 不会启动获取值的并行或不必要的操作。多个调用站点或线程可以等待缓存中的相同值。
  • 失败的任务不会被缓存。(没有负缓存。)
  • 缓存用户无法从缓存中获取无效结果,即使该值在等待期间无效。

该解决方案在我的博客中有进一步的解释,完整的工作代码可在GitHub 上找到

于 2016-04-15T09:09:17.950 回答
0

这对我有用:

ObjectCache _cache = MemoryCache.Default;
static object _lockObject = new object();
public Task<T> GetAsync<T>(string cacheKey, Func<Task<T>> func, TimeSpan? cacheExpiration = null) where T : class
{
    var task = (T)_cache[cacheKey];
    if (task != null) return task;          
    lock (_lockObject)
    {
        task = (T)_cache[cacheKey](cacheKey);
        if (task != null) return task;
        task = func();
        Set(cacheKey, task, cacheExpiration);
        task.ContinueWith(t => {
            if (t.Status != TaskStatus.RanToCompletion)
                _cache.Remove(cacheKey);
        });
    }
    return task;
} 
于 2015-12-11T11:18:27.260 回答