0

我正在使用 Silverlight 的 Reactive 框架,并希望实现以下目标。

我正在尝试为 Silverlight 客户端创建一个典型的数据提供程序,该客户端还利用 MS Ent Lib 中提供的缓存框架。这些方案要求我必须在访问 WCF 数据客户端之前检查缓存中的键值对。

通过使用 Rx 扩展 Amb,我可以从缓存或 WCF 数据客户端中提取数据,以先返回者为准,但是如果值在缓存中,我如何阻止 WCF 客户端执行调用?

我还想考虑竞争条件,例如,如果第一个订阅者请求一些数据并且提供者正在从 WCF 数据客户端(异步)获取数据,我如何防止后续异步请求做同样的事情(在这个阶段,缓存尚未填充)。

4

3 回答 3

1

我有完全相同的问题。我使用具有以下签名的扩展方法解决了它:

IObservable<R> FromCacheOrFetch<T, R>(
    this IObservable<T> source,
    Func<T, R> cache,
    Func<IObservable<T>, IObservable<R>> fetch,
    IScheduler scheduler) where R : class

实际上,这样做是在源 observable 中获取并返回一个 observable,它将每个输入值与其输出值匹配。

要获取每个输出值,它将首先检查缓存。如果该值存在于缓存中,则使用该值。如果不是,它只会在不在缓存中的值上启动fetch函数。如果所有值都在缓存中,则该函数将永远不会启动 - 因此没有服务连接设置惩罚等。fetch

我会给你代码,但它基于使用Maybe<T>monad 的扩展方法的一个稍微不同的版本——所以你可能会发现你需要摆弄实现。

这里是:

    public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source, Func<T, R> cache, Func<IObservable<T>, IObservable<R>> fetch, IScheduler scheduler)
        where R : class
    {
        return source.FromCacheOrFetch<T, R>(t => cache(t).ToMaybe(null), fetch, scheduler);
    }

    public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source, Func<T, Maybe<R>> cache, Func<IObservable<T>, IObservable<R>> fetch, IScheduler scheduler)
    {
        var results = new Subject<R>();

        var disposables = new CompositeDisposable();

        var loop = new EventLoopScheduler();
        disposables.Add(loop);

        var sourceDone = false;
        var pairsDone = true;
        var exception = (Exception)null;

        var fetchIn = new Subject<T>();
        var fetchOut = (IObservable<R>)null;
        var pairs = (IObservable<KeyValuePair<int, R>>)null;

        var lookup = new Dictionary<T, int>();
        var list = new List<Maybe<R>>();
        var cursor = 0;

        Action checkCleanup = () =>
        {
            if (sourceDone && pairsDone)
            {
                if (exception == null)
                {
                    results.OnCompleted();
                }
                else
                {
                    results.OnError(exception);
                }
                loop.Schedule(() => disposables.Dispose());
            }
        };

        Action dequeue = () =>
        {
            while (cursor != list.Count)
            {
                var mr = list[cursor];
                if (mr.HasValue)
                {
                    results.OnNext(mr.Value);
                    cursor++;
                }
                else
                {
                    break;
                }
            }
        };

        Action<KeyValuePair<int, R>> nextPairs = kvp =>
        {
            list[kvp.Key] = Maybe<R>.Something(kvp.Value);
            dequeue();
        };

        Action<Exception> errorPairs = ex =>
        {
            fetchIn.OnCompleted();
            pairsDone = true;
            exception = ex;
            checkCleanup();
        };

        Action completedPairs = () =>
        {
            pairsDone = true;
            checkCleanup();
        };

        Action<T> sourceNext = t =>
        {
            var mr = cache(t);
            list.Add(mr);
            if (mr.IsNothing)
            {
                lookup[t] = list.Count - 1;
                if (fetchOut == null)
                {
                    pairsDone = false;
                    fetchOut = fetch(fetchIn.ObserveOn(Scheduler.ThreadPool));
                    pairs = fetchIn.Select(x => lookup[x]).Zip(fetchOut, (i, r2) => new KeyValuePair<int, R>(i, r2));
                    disposables.Add(pairs.ObserveOn(loop).Subscribe(nextPairs, errorPairs, completedPairs));
                }
                fetchIn.OnNext(t);
            }
            else
            {
                dequeue();
            }
        };

        Action<Exception> errorSource = ex =>
        {
            sourceDone = true;
            exception = ex;
            fetchIn.OnCompleted();
            checkCleanup();
        };

        Action completedSource = () =>
        {
            sourceDone = true;
            fetchIn.OnCompleted();
            checkCleanup();
        };

        disposables.Add(source.ObserveOn(loop).Subscribe(sourceNext, errorSource, completedSource));

        return results.ObserveOn(scheduler);
    }

示例用法如下所示:

您将拥有要获取的索引的来源:

IObservable<X> source = ...

您将有一个可以从缓存中获取值的函数和一个可以将它们放入的操作(并且两者都应该是线程安全的):

Func<X, Y> getFromCache = x => ...;
Action<X, Y> addToCache = (x, y) => ...;

然后,您将实际调用从数据库或服务中获取数据:

Func<X, Y> getFromService = x => ...;

然后你可以这样定义fetch

Func<IObservable<X>, IObservable<Y>> fetch =
    xs => xs.Select(x =>
    {
        var y = getFromService(x);
        addToCache(x, y);
        return y;
    });

最后,您可以通过调用以下命令进行查询:

IObservable<Y> results =
    source.FromCacheOrFetch(
        getFromCache,
        fetch,
        Scheduler.ThreadPool);

当然,您需要订阅结果才能进行计算。

于 2012-05-21T09:02:25.360 回答
0

显然Amb这不是正确的方法,因为每次都会同时影响缓存和服务。如果缓存未命中,EntLib 会返回什么?

请注意,这Observable.Timeout是一个合理的选择:

cache(<paramters>).Timeout(TimeSpan.FromSeconds(1), service<paramters>);

但显然,如果您想处理来自 EntLib 的返回并采取适当的行动,那么超时并不是一个好主意。

我不明白为什么这必然是一个反应性扩展问题。

于 2012-05-21T07:42:54.697 回答
0

一种简单的方法,可能不如@Enigmativity 的解决方案功能全面,可能类似于:

public IObservable<T> GetCachedValue<TKey, TResult>(TKey key, Func<TKey, TResult> getFromCache, Func<TKey, TResult> getFromSource)
{
    return getFromCache(<key>).Concat(getFromSource(<key>).Take(1);
}

这只是一个松散的想法,您需要添加:

  • 将项目添加到缓存的机制,或假设 getFromSource 缓存结果
  • 某种线程安全性以防止对同一未缓存键的源多次命中(如果需要)
  • 如果项目不在缓存中,getFromCache 将需要返回 Observable.Empty()。

但如果你想要一些简单的东西,这不是一个糟糕的起点。

于 2015-07-07T04:46:02.570 回答