4

我在一个常见的场景中工作,我想访问存储库的一个子集,而不必担心必须保持更新,例如“获取价格大于 10 的所有订单”。我已经实施了一个解决方案,但有两个问题(最后列出)。

一个存储库的子集可以用相当于的东西来实现

var expensiveOrders = Repository.GetOrders().Where(o => o.Price > 10);

但这是一个IEnumerable并且不会在更新原始集合时更新。我可以添加处理程序CollectionChanged,但是如果我们想访问更多的子集怎么办?

var expensiveOrdersFromBob = expensiveOrders.Where(o => o.Name == Bob);

我们还必须为这个连接一个集合更改。实时更新的概念让我想到了 Rx,所以我开始构建一个ObservableCache,它包含ObservableCollection自动更新的项目和用于通知的 RX 流。(流也是更新引擎盖下缓存的内容。)

class ObservableCache<T> : IObservableCache<T>
{
    private readonly ObservableCollection<T> _cache;
    private readonly IObservable<Tuple<T, CRUDOperationType>> _updates;

    public ObservableCache(IEnumerable<T> initialCache
       , IObservable<Tuple<T, CRUDOperationType>> currentStream, Func<T, bool> filter)
        {
          _cache = new ObservableCollection<T>(initialCache.Where(filter));
          _updates = currentStream.Where(tuple => filter(tuple.Item1));
          _updates.Subscribe(ProcessUpdate);
        }

    private void ProcessUpdate(Tuple<T, CRUDOperationType> update)
    {
        var item = update.Item1;
        lock (_cache)
        {
            switch (update.Item2)
            {
                case CRUDOperationType.Create:
                    _cache.Add(item);
                    break;
                case CRUDOperationType.Delete:
                    _cache.Remove(item);
                    break;
                case CRUDOperationType.Replace:
                case CRUDOperationType.Update:
                    _cache.Remove(item); // ToDo: implement some key-based equality
                    _cache.Add(item);
                    break;
            }
        }
    }

    public ObservableCollection<T> Cache
    {
        get { return _cache; }
    }

    public IObservable<T> Updates
    {
        get { return _updates.Select(tuple => tuple.Item1); }
    }

    public IObservableCache<T> Where(Func<T, bool> predicate)
    {
        return new ObservableCache<T>(_cache, _updates, predicate);
    }
}

然后你可以像这样使用它:

var expensiveOrders = new ObservableCache<Order>(_orders
                                                 , updateStream
                                                 , o => o.Price > 10);
expensiveOrders.Updates.Subscribe
     (o => Console.WriteLine("Got new expensive order: " + o));
_observableBoundToSomeCtrl = expensiveOrders.Cache;

var expensiveOrdersFromBob = expensiveOrders
                             .Where(o => o.Name == "Bob");
expensiveOrdersFromBob.Updates.Subscribe
         (o => Console.WriteLine("Got new expensive order from Bob: " + o));
_observableBoundToSomeOtherCtrl = expensiveOrdersFromBob.Cache;

依此类推,您可以继续将缓存投射到越来越窄的子集中,而不必担心它会不同步。那我的问题是什么?

  1. 我想知道我是否可以通过让 RX 本质上更新集合来消除 CRUD 的东西。也许用 Select 或类似的东西“投影”更新?
  2. 存储库更新模式固有的竞争条件,因为我在构建新缓存时可能会错过一些更新。我想我需要某种排序,但这意味着让我所有的 T 对象都实现一个ISequenceableItem接口。有没有更好的方法来做到这一点?RX 很棒,因为它为您处理所有线程。我想利用这一点。
4

4 回答 4

1

http://github.com/wasabii/OLinq上的 OLinq 项目就是为这种反应式更新而设计的,我认为 ObservableView 就是您所追求的。

于 2013-07-01T11:26:09.100 回答
1

看看这两个项目虽然通过不同的方式实现了你想要的东西:

https://github.com/RolandPheasant/DynamicData

https://bitbucket.org/mendelmonteiro/reactivetables [免责声明:这是我的项目]

于 2015-06-17T13:46:27.517 回答
1

假设您有这样的定义:

class SetOp<T>
{
    public T Value { get; private set; }
    public bool Include { get; private set; }
    public SetOp(T value, bool include)
    {
        Value = value;
        Include = include;
    }
}

使用Observable.Scan并且System.Collections.Immutable您可以执行以下操作:

IObservable<SetOp<int>> ops = ...;
IImmutableSet<int> empty = ImmutableSortedSet<int>.Empty;
var observableSet = ops
    .Scan(empty, (s, op) => op.Include ? s.Add(op.Value) : s.Remove(op.Value))
    .StartWith(empty);

使用不可变集合类型是这里的关键技巧:任何观察者都observableSet可以对推送给它的值做任何想做的事情,因为它们是不可变的。添加它是有效的,因为它在连续值之间重用了大部分集合数据结构。

这是一个ops流和相应的示例observableSet

ops       observableSet
--------  ------------------
          {}
Add 7     {7}
Add 4     {4,7}
Add 5     {4,5,7}
Add 6     {4,5,6,7}
Remove 5  {4,6,7}
Add 8     {4,6,7,8}
Remove 4  {6,7,8}
于 2015-06-17T18:54:37.713 回答
0

您不需要锁定_cacheProcessUpdate. 如果您的源 observablecurrentStream遵守 Rx 指南,则保证您一次只能在一次调用中OnNext。换句话说,当您仍在处理前一个值时,您不会从流中接收另一个值。

解决竞争条件的唯一可靠方法是确保在updateStream开始生成数据之前创建缓存。

您可能想看看Extensions for Reactive Extensions (Rxx)。我相信 Dave 已经构建了许多用于将 UI 控件绑定到可观察数据的实用程序。文档很少。我不知道你在做什么。

于 2013-04-15T20:31:07.917 回答