编辑:为了简化这里的事情是范式:我有一个由连续流不断更新的项目列表。时不时地,我会得到一个重新初始化流的新数据快照。因此,如果在我想重新初始化时发生任何更新,我需要确保这些更新停止并且正在使用新快照。
我正在处理一些需要显示到 UI 的连续更新数据流。更新需要以相反的顺序显示,即最近的更新位于列表顶部。为了在顶部显示结果,我必须插入到列表中。我遇到的问题是有时列表需要休息(即 List.Clear),但是,如果我在插入中间,我需要停止它,因为否则插入会导致异常。
我已经整理了一种反应方法来帮助我解决这个问题,但是,它似乎忽略了我的直到流。
public static IObservable<T> BufferAndDispatchUntil<T, TStopUnit>(
this IObservable<T> source,
Action<T> onNext,
IScheduler scheduler,
IObservable<TStopUnit> until,
DispatcherPriority dispatcherPriority = DispatcherPriority.Background)
{
if (source == null) throw new ArgumentNullException("source");
if (onNext == null) throw new ArgumentNullException("onNext");
if (Application.Current == null)
return source.Do(onNext);
var dispatcher = Application.Current.Dispatcher;
return source
.LazyBuffer(BufferTime, BufferCount, scheduler)
.TakeUntil(until)
.Do(b => dispatcher.BeginInvoke(() => b.ForEach(onNext), dispatcherPriority))
.SelectMany(i => i);
}
LazyBuffer 是 Buffer 的自定义实现,它只在有新项可用时返回结果集,而不是在指定的时间间隔返回空结果集。这就是我调用它的方式,它如上所述。
BufferAndDispatchUntil(p => Update.Insert(p.Item1, UpdateFactory.CreateView(p.Item2)), _config.DispatcherScheduler, _ignore);
这是我在单独线程上运行的单独代码段中的明确调用。
_ignore.OnNext(new Unit());
Update.Clear();
如果您能帮我弄清楚,我将不胜感激。