0

编辑:为了简化这里的事情是范式:我有一个由连续流不断更新的项目列表。时不时地,我会得到一个重新初始化流的新数据快照。因此,如果在我想重新初始化时发生任何更新,我需要确保这些更新停止并且正在使用新快照。

我正在处理一些需要显示到 UI 的连续更新数据流。更新需要以相反的顺序显示,即最近的更新位于列表顶部。为了在顶部显示结果,我必须插入到列表中。我遇到的问题是有时列表需要休息(即 List.Clear),但是,如果我在插入中间,我需要停止它,因为否则插入会导致异常。

我已经整理了一种反应方法来帮助我解决这个问题,但是,它似乎忽略了我的直到流。

public static IObservable<T> BufferAndDispatchUntil<T, TStopUnit>(
                             this IObservable<T> source, 
                             Action<T> onNext, 
                             IScheduler scheduler, 
                             IObservable<TStopUnit> until, 
                             DispatcherPriority dispatcherPriority = DispatcherPriority.Background)
{
        if (source == null) throw new ArgumentNullException("source");
        if (onNext == null) throw new ArgumentNullException("onNext");

        if (Application.Current == null)
            return source.Do(onNext);

        var dispatcher = Application.Current.Dispatcher;

        return source
            .LazyBuffer(BufferTime, BufferCount, scheduler)
            .TakeUntil(until)
            .Do(b => dispatcher.BeginInvoke(() => b.ForEach(onNext), dispatcherPriority))
            .SelectMany(i => i);
}

LazyBuffer 是 Buffer 的自定义实现,它只在有新项可用时返回结果集,而不是在指定的时间间隔返回空结果集。这就是我调用它的方式,它如上所述。

BufferAndDispatchUntil(p => Update.Insert(p.Item1, UpdateFactory.CreateView(p.Item2)), _config.DispatcherScheduler, _ignore);

这是我在单独线程上运行的单独代码段中的明确调用。

_ignore.OnNext(new Unit());  
Update.Clear();

如果您能帮我弄清楚,我将不胜感激。

4

2 回答 2

0

You can't stop "mid insert". You need to lock your data structures when performing non-atomic operations to ensure data integrity.

lock(someObj)
{
    myList.Insert(i, obj);
}

Make sure to lock at the smallest resolution possible, i.e., don't lock an entire method if you need only protect against a single non-atomic operation.

That, or have you looked at C#'s thread safe collections yet which handle much of the locking for you?

于 2012-05-18T21:36:58.060 回答
0

当您执行此操作时,您将撤销 Rx 为您提供的保证:

.Do(b => dispatcher.BeginInvoke(() => b.ForEach(onNext), dispatcherPriority))

这就是你遇到这个错误的原因,因为你在排队等待在 UI 线程中运行的东西,但是你的 Rx 管道不知道它,所以你最终分裂了。

于 2012-05-19T21:35:55.347 回答