我的目标是为“Type2”对象每 5 秒调用一次更新处理程序。observable 每 5 秒会生成一个以上的值,但我想忽略在最后一次处理更新后 5 秒内发生的所有值。
我在这里问了这个问题: 仅在满足特定条件时才进行节流
并得到了很好的反馈。它导致我使用 Observable.Window 来尝试实现我的目标。我以为我让它工作了,但事实证明,如果第一次更新恰好在窗口关闭之前发生(因此更新被处理),然后一旦下一个窗口打开,另一个更新到达并且也被处理,当我不希望它发生,因为它在最后一次处理更新的 5 秒内出现。
这是一些演示该问题的代码,对链接中的代码稍作修改:
var source = new Subject<Thing>();
var feed = source.Publish().RefCount();
var ofType1 = feed.Where(t => t.ActivationType == "Type1");
var ofType2 = feed
.Where(t => t.ActivationType == "Type2")
.Window(() =>
Observable.Timer(TimeSpan.FromSeconds(5))
.Do(t => Console.WriteLine("\nTICK: " + DateTime.Now.ToString("hh:mm:ss:fff"))))
.Select(x => x.Take(1))
.Merge()
.Do(t => Console.WriteLine("A new window opened " + DateTime.Now.ToString("hh:mm:ss:fff")));
var query = ofType1.Merge(ofType2);
query.Subscribe(t => Console.WriteLine("UPDATE: " + t.ID + " " + DateTime.Now.ToString("hh:mm:ss:fff")));
int msDelay = 3000;
Task task = Task.Factory
.StartNew(() => { Thread.Sleep(msDelay); })
.ContinueWith((Task starter) =>
{
while (running)
{
var thing = new Thing(); //Note that all Things are by default Type2
source.OnNext(thing);
Thread.Sleep(100);
}
}, TaskContinuationOptions.LongRunning);
Console.ReadLine();
因此,订阅已完成,一旦订阅完成,Window 中使用的 Observable.Timer 就会开始。用于生成值的 while 循环直到 3000 毫秒延迟后才开始。
输出如下所示:
A new window opened 03:48:03:725
UPDATE: 1ac54fb3-f73d-4840-b4d8-95d4250ce65d 03:48:03:752
TICK: 03:48:05:714
A new window opened 03:48:05:754
UPDATE: 12d36e53-010f-4ccd-b9f8-2951b085f88c 03:48:05:754
TICK: 03:48:10:730
A new window opened 03:48:10:755
UPDATE: 25d84e72-94f9-4f50-83f4-14c1004c10fa 03:48:10:755
TICK: 03:48:15:738
A new window opened 03:48:15:755
UPDATE: 5f32b7d5-196f-445c-bf25-5c362b2fd6f0 03:48:15:755
TICK: 03:48:20:747
A new window opened 03:48:20:756
UPDATE: e3a3a30d-8031-41b5-b115-499dbe91aaf7 03:48:20:756
TICK: 03:48:25:755
A new window opened 03:48:25:756
UPDATE: 239fb25b-5135-463b-bf7e-5728ffa07f5c 03:48:25:756
如您所见,第一个 Type2 更新是在窗口打开时进行的,因此会得到处理。然后,2 秒后,Window 的计时器开始计时并打开一个新窗口。它会立即处理下一个 Type2 更新,我不希望它这样做。之后它看起来可以正常工作(按照 Window 声明中的定义,每 5 秒更新一次)。
有没有一种方法或另一种方法可以用来确保每 5 秒(或我选择的任何时间范围)只有一次更新?