好的,我已经对 Reactive Extensions 进行了一些研究,我已经整理了代码,并且我有两个可能的答案。我不知道其中一种是否是“正确”的方式,但两者似乎都有效。RX 看起来很强大,但是使用它会扩展我的 C# 能力的极限。
1.问题
首先,重申这个问题,想象我有这个类:
public class Project
{
public delegate void InvalidateEventHandler();
public event InvalidateEventHandler Invalidated;
private void InvalidateMyself() { if (Invalidated != null) Invalidated(); }
public void HeftyComputation() { Thread.Sleep(2000); }
public void SimulateUserDoingStuff()
{
Thread.Sleep(100);
InvalidateMyself();
Thread.Sleep(100);
InvalidateMyself();
Thread.Sleep(100);
InvalidateMyself();
}
public Project() { }
}
它保存数据并且不是线程安全的。它有三种方法。一个人根据其内部数据执行大量计算,以便在数据发生变化时自行更新。这是通过具有调用 InvalidateMyself() 函数的内部属性(未演示)来实现的,触发要由父类处理的事件,父类将在某些时候决定告诉对象更新自身。最后,我有一个“模拟用户输入”方法,它在 t=0.1s、t=0.2s 和 t=0.3s 时调用 InvalidateMyself()。
现在,使此类更新自身的最简单方法是获取父对象,在这种情况下是应用程序演示者,侦听 Invalidate 事件并在它到来时直接触发 HeftyComputation() 方法。观察以下课程:
public class ApplicationPresenterBasic
{
private DateTime started;
public Project Project { get; set; }
public ApplicationPresenterBasic()
{
// Create the project and subscribe to the invalidation event
Project = new Project();
Project.Invalidated += Project_Invalidated;
// Simulate the user doing stuff
started = DateTime.Now;
Project.SimulateUserDoingStuff();
}
void Project_Invalidated()
{
UpdateProject();
}
void UpdateProject()
{
System.Diagnostics.Debug.WriteLine(string.Format("Running HeftyComputation() at {0}s", (DateTime.Now - started).TotalSeconds));
Project.HeftyComputation();
}
}
这基本上就是这样做的,除了它在每次对象“无效”时运行 HeftyComputation() 方法。这是输出:
Running HeftyComputation() at 0.1010058s
Running HeftyComputation() at 2.203126s
Running HeftyComputation() at 4.3042462s
很公平?现在假设我想要的行为是让应用程序演示者在执行 HeftyComputation() 之前等待 1 秒且没有失效。这样,所有三个更新都在 t=1.3 秒时同时处理。
我已经完成了这两种方法,一次使用监听线程和 System.Windows.Threading Dispatcher,一次使用 Reactive Extensions 的 IObservable.Throttle
2. 使用Tasks和System.Windows.Dispatcher的解决方案
我能想到的使用后台任务和调度程序的唯一优势是您不依赖任何第三方库。但是,您被锁定在引用 WindowsBase 程序集,所以我无法想象这会在 Mono 上工作,如果这对您很重要的话。
public class ApplicationPresenterWindowsDispatcher
{
private DateTime started;
public Project Project { get; set; }
/* Stuff necessary for this solution */
private delegate void ComputationDelegate();
private object Mutex = new object();
private bool IsValid = true;
private DateTime LastInvalidated;
private Task ObservationTask;
private Dispatcher MainThreadDispatcher;
private CancellationTokenSource TokenSource;
private CancellationToken Token;
public void ObserveAndTriggerComputation(CancellationToken ctoken)
{
while (true)
{
ctoken.ThrowIfCancellationRequested();
lock (Mutex)
{
if (!IsValid && (DateTime.Now - LastInvalidated).TotalSeconds > 1)
{
IsValid = true;
ComputationDelegate compute = new ComputationDelegate(UpdateProject);
MainThreadDispatcher.BeginInvoke(compute);
}
}
}
}
public ApplicationPresenterWindowsDispatcher()
{
// Create the project and subscribe to the invalidation event
Project = new Project();
Project.Invalidated += Project_Invalidated;
// Set up observation task
MainThreadDispatcher = Dispatcher.CurrentDispatcher;
Mutex = new object();
TokenSource = new CancellationTokenSource();
Token = TokenSource.Token;
ObservationTask = Task.Factory.StartNew(() => ObserveAndTriggerComputation(Token), Token);
// Simulate the user doing stuff
started = DateTime.Now;
Project.SimulateUserDoingStuff();
}
void Project_Invalidated()
{
lock (Mutex)
{
IsValid = false;
LastInvalidated = DateTime.Now;
}
}
void UpdateProject()
{
System.Diagnostics.Debug.WriteLine(string.Format("Running HeftyComputation() at {0}s", (DateTime.Now - started).TotalSeconds));
Project.HeftyComputation();
}
}
它是这样工作的:当创建应用程序演示者时,会产生一个在后台运行的任务。此任务监视一个表示 Project 对象是否仍然有效的布尔值和一个表示上次失效时间的 DateTime。当 bool 为 false 并且最后一次失效时间超过 1 秒之前,Task 使用 Dispatcher 在主线程上调用一个方法,该方法执行 HeftyComputation()。失效事件处理程序现在使主线程简单地更新 bool 和 DateTime 并等待后台线程决定何时运行更新。
输出:
Running HeftyComputation() at 1.3060747s
所以这似乎有效。这可能不是最好的方法,而且我不擅长并发,所以如果有人在这里看到错误或问题,请指出。
3. 使用响应式扩展的解决方案
这是使用反应式扩展的解决方案。幸运的是,NuGet 使将其添加到项目中变得微不足道,但对于我的技能水平的人来说,使用它是另一回事。Normon H 提到它只需要几行代码,事实证明这是真的,但是让这几行代码正确使用比编写 Dispatcher 解决方案花费的时间要长得多。
如果你擅长代理和 lambda,那么反应式扩展看起来很棒。不过我不是,所以我很挣扎。
public class ApplicationPresenterRX
{
private DateTime started;
public Project Project { get; set; }
public ApplicationPresenterRX()
{
// Create the project and subscribe to the invalidation event
Project = new Project();
var invalidations = Observable.FromEvent(ev => { this.Project.Invalidated += () => ev(); },
ev => { this.Project.Invalidated -= () => ev(); });
var throttledInvalidations = invalidations.Throttle(TimeSpan.FromSeconds(1));
throttledInvalidations.Subscribe(e => { UpdateProject(); });
// Simulate the user doing stuff
started = DateTime.Now;
Project.SimulateUserDoingStuff();
}
void UpdateProject()
{
System.Diagnostics.Debug.WriteLine(string.Format("Running HeftyComputation() at {0}s", (DateTime.Now - started).TotalSeconds));
Project.HeftyComputation();
}
}
就是这样,三行。我从 Project.Invalidated 事件创建一个 IObservable 源,创建第二个 IObservable 源来限制第一个源,然后订阅它,以便在受限制的源激活时调用我的 UpdateProject() 方法。弄清楚如何用我的事件正确调用 Observable.FromEvent 方法是该过程中最难的部分。
输出:
Running HeftyComputation() at 1.3090749s
通过阅读此网站: http: //www.introtorx.com/uat/content/v1.0.10621.0/13_SchedulingAndThreading.html我的收获是,虽然 RX 使用线程进行计时和调度,但默认情况下它不会调用代码一个不同的线程。因此 UpdateProject() 方法应该在主线程上运行。