我正在寻找建立一个并行缓存。要求是需要一次触发n个数据收集器。这些数据收集器中的每一个都将到达边界层(称为服务层)并检索数据。但是,由于这是在同一个请求 (WCF) 中,如果 2 个数据收集器需要在服务层调用相同的方法,我不希望第二个请求等待第一个请求完成。
这需要对构建数据收集器的开发人员透明地构建(使用 Unity Interception 插入此缓存方面)。
这是流程的样子。反应式扩展是否适合这种设计?我过去没有与 Rx 合作过,并且不想在开发 10 天后碰壁。否则,async、await 和 events 的组合也可能在这里很好地发挥作用。
编辑:我使用 Rx 实现了这个 - 在多线程上下文中运行良好。有趣的是尝试 add 而不是 tryGet。(这是一个 Unity 拦截 CallHandler)
/// <summary>
/// Intercepts the calls and tries to retrieve from the cache
/// </summary>
class CacheCallHandler : ICallHandler
{
[Dependency]
public ICache RequestCache { get; set; }
public IMethodReturn Invoke(IMethodInvocation input, GetNextHandlerDelegate getNext)
{
IMethodReturn mesg = null;
string cacheKey = CacheKeyGenerator.GetCacheKey(input);
//create the task to retrieve the data
var task = new Task<IMethodReturn>(() =>
{
return getNext()(input, getNext);
});
//make it observable
var observableItem = task.ToObservable();
//try to add it to the cache
//we need to do this in the order of Add and then try to get, otherwise multiple thread might enter the same area
if (RequestCache.TryAdd(cacheKey, observableItem))
{
//if the add succeeed, it means that we are responsible to starting this task
task.Start();
}
else
{
if ( RequestCache.TryGetValue(cacheKey, out observableItem) )
{
//do nothing, the observable item is already updated with the requried reference
}
else
{
throw new CacheHandlerException("Could not add to cache AND could not retrieve from cache either. Something's wrong", input);
}
}
//observe the return
if ( observableItem != null )
mesg = observableItem.FirstOrDefault();
if (mesg == null)
throw new CacheHandlerException("Not return value found. this should not happen", input);
return mesg;
}
/// <summary>
/// Should always be the first to execute on the boundary
/// </summary>
public int Order
{
get { return 1; }
set { ; }
}
}