1

我正在寻找建立一个并行缓存。要求是需要一次触发n个数据收集器。这些数据收集器中的每一个都将到达边界层(称为服务层)并检索数据。但是,由于这是在同一个请求 (WCF) 中,如果 2 个数据收集器需要在服务层调用相同的方法,我不希望第二个请求等待第一个请求完成。

这需要对构建数据收集器的开发人员透明地构建(使用 Unity Interception 插入此缓存方面)。

这是流程的样子。反应式扩展是否适合这种设计?我过去没有与 Rx 合作过,并且不想在开发 10 天后碰壁。否则,async、await 和 events 的组合也可能在这里很好地发挥作用。

编辑:我使用 Rx 实现了这个 - 在多线程上下文中运行良好。有趣的是尝试 add 而不是 tryGet。(这是一个 Unity 拦截 CallHandler)

 /// <summary>
    /// Intercepts the calls and tries to retrieve from the cache
    /// </summary>
    class CacheCallHandler : ICallHandler
    {

        [Dependency]
        public ICache RequestCache { get; set; }

        public IMethodReturn Invoke(IMethodInvocation input, GetNextHandlerDelegate getNext)
        {
            IMethodReturn mesg = null;

            string cacheKey = CacheKeyGenerator.GetCacheKey(input);

            //create the task to retrieve the data
            var task = new Task<IMethodReturn>(() =>
            {
                return getNext()(input, getNext);
            });

            //make it observable
            var observableItem = task.ToObservable();

            //try to add it to the cache
            //we need to do this in the order of Add and then try to get, otherwise multiple thread might enter the same area
            if (RequestCache.TryAdd(cacheKey, observableItem))
            {
                //if the add succeeed, it means that we are responsible to starting this task
                task.Start();
            }
            else
            {
                if ( RequestCache.TryGetValue(cacheKey, out observableItem) )
                {
                    //do nothing, the observable item is already updated with the requried reference
                }
                else
                {
                    throw new CacheHandlerException("Could not add to cache AND could not retrieve from cache either. Something's wrong", input);
                }
            }

            //observe the return 
            if ( observableItem != null )
                mesg = observableItem.FirstOrDefault();

            if (mesg == null)
                throw new CacheHandlerException("Not return value found. this should not happen", input);

            return mesg;
        }


        /// <summary>
        /// Should always be the first to execute on the boundary
        /// </summary>
        public int Order
        {
            get { return 1; }
            set { ; }
        }
    }

在此处输入图像描述

4

3 回答 3

4

https://github.com/reactiveui/ReactiveUI/blob/master/ReactiveUI/ObservableAsyncMRUCache.cs已经基本上完全符合您的要求,特别是对相同内容的两个请求被“去抖动”。从评论:

/// ObservableAsyncMRUCache implements memoization for asynchronous or
/// expensive to compute methods. This memoization is an MRU-based cache
/// with a fixed limit for the number of items in the cache.     
///
/// This class guarantees that only one calculation for any given key is
/// in-flight at a time, subsequent requests will wait for the first one and
/// return its results (for example, an empty web image cache that receives
/// two concurrent requests for "Foo.jpg" will only issue one WebRequest -
/// this does not mean that a request for "Bar.jpg" will wait on "Foo.jpg").
///
/// Concurrency is also limited by the maxConcurrent parameter - when too
/// many in-flight operations are in progress, further operations will be
/// queued until a slot is available.
于 2012-10-01T01:03:02.877 回答
1

是的,Rx 非常适合这个。

我建议您考虑实现以下字典来支持您的密钥缓存:

Dictionary<K, AsyncSubject<V>>

您的异步获取数据部分只需要订阅主题即可使用结果填充它。

于 2012-09-29T22:34:59.870 回答
0

我会倾向于一个async解决方案,特别是使用AsyncLazy<T>(来自我的博客)

public sealed class MyCache<TKey, TValue>
{
  private readonly ConcurrentDictionary<TKey, AsyncLazy<TValue>> dictionary =
      new ConcurrentDictionary<TKey, AsyncLazy<TValue>>();

  private readonly Func<TKey, Task<TValue>> LookupAsync;

  public MyCache(Func<TKey, Task<TValue>> lookupAsync)
  {
    LookupAsync = lookupAsync;
  }

  public AsyncLazy<TValue> Get(TKey key)
  {
    return dictionary.GetOrAdd(key,
        key => new AsyncLazy<TValue>(() => lookupAsync(key)));
  }
}

这是一个非常简单的“缓存”,因为它没有过期时间。它可以这样使用:

MyCache<string, MyResource> cache = new MyCache<string, MyResource>(async key =>
{
  MyResource ret = await DataLayer.GetResourceAsync(key);
  ...
  return ret;
});
MyResource resource = await cache.Get("key");

在多线程情况下,GetOrAdd可能会创建一个额外的AsyncLazy<TValue>,但它永远不会被await编辑,因此lookupAsync每个 只会被调用一次TKey。另请注意,lookupAsync始终从线程池中调用。

PS 如果你去async,你可能会发现我的asyncWCF 帖子很有帮助。

于 2012-09-30T01:19:47.263 回答