0

我的目标是为“Type2”对象每 5 秒调用一次更新处理程序。observable 每 5 秒会生成一个以上的值,但我想忽略在最后一次处理更新后 5 秒内发生的所有值。

我在这里问了这个问题: 仅在满足特定条件时才进行节流

并得到了很好的反馈。它导致我使用 Observable.Window 来尝试实现我的目标。我以为我让它工作了,但事实证明,如果第一次更新恰好在窗口关闭之前发生(因此更新被处理),然后一旦下一个窗口打开,另一个更新到达并且也被处理,当我不希望它发生,因为它在最后一次处理更新的 5 秒内出现。

这是一些演示该问题的代码,对链接中的代码稍作修改:

var source = new Subject<Thing>();    
var feed = source.Publish().RefCount();

var ofType1 = feed.Where(t => t.ActivationType == "Type1");
var ofType2 = feed
    .Where(t => t.ActivationType == "Type2")
    .Window(() =>
            Observable.Timer(TimeSpan.FromSeconds(5))
            .Do(t => Console.WriteLine("\nTICK: " + DateTime.Now.ToString("hh:mm:ss:fff"))))
    .Select(x => x.Take(1))
    .Merge()
    .Do(t => Console.WriteLine("A new window opened " + DateTime.Now.ToString("hh:mm:ss:fff")));


var query = ofType1.Merge(ofType2);        
query.Subscribe(t => Console.WriteLine("UPDATE: " + t.ID + " " + DateTime.Now.ToString("hh:mm:ss:fff")));    

int msDelay = 3000;
Task task = Task.Factory
    .StartNew(() => { Thread.Sleep(msDelay); })
    .ContinueWith((Task starter) =>
        {
            while (running)
            {
                var thing = new Thing();  //Note that all Things are by default Type2
                source.OnNext(thing);
                Thread.Sleep(100);
            }
        }, TaskContinuationOptions.LongRunning);

Console.ReadLine();

因此,订阅已完成,一旦订阅完成,Window 中使用的 Observable.Timer 就会开始。用于生成值的 while 循环直到 3000 毫秒延迟后才开始。

输出如下所示:

A new window opened 03:48:03:725
UPDATE: 1ac54fb3-f73d-4840-b4d8-95d4250ce65d 03:48:03:752

TICK: 03:48:05:714
A new window opened 03:48:05:754
UPDATE: 12d36e53-010f-4ccd-b9f8-2951b085f88c 03:48:05:754

TICK: 03:48:10:730
A new window opened 03:48:10:755
UPDATE: 25d84e72-94f9-4f50-83f4-14c1004c10fa 03:48:10:755

TICK: 03:48:15:738
A new window opened 03:48:15:755
UPDATE: 5f32b7d5-196f-445c-bf25-5c362b2fd6f0 03:48:15:755

TICK: 03:48:20:747
A new window opened 03:48:20:756
UPDATE: e3a3a30d-8031-41b5-b115-499dbe91aaf7 03:48:20:756

TICK: 03:48:25:755
A new window opened 03:48:25:756
UPDATE: 239fb25b-5135-463b-bf7e-5728ffa07f5c 03:48:25:756

如您所见,第一个 Type2 更新是在窗口打开时进行的,因此会得到处理。然后,2 秒后,Window 的计时器开始计时并打开一个新窗口。它会立即处理下一个 Type2 更新,我不希望它这样做。之后它看起来可以正常工作(按照 Window 声明中的定义,每 5 秒更新一次)。

有没有一种方法或另一种方法可以用来确保每 5 秒(或我选择的任何时间范围)只有一次更新?

4

1 回答 1

1

我想我有一个解决方案,但首先我可以提出一些建议。我认为这个问题有很多噪音,很难找到真正的问题是什么。

实际上,您是在问“我怎样才能获得一个值,然后保持至少 5 秒的静音”。Type1代码让人分心。序列的生成也是一种干扰。

因此,让我们清理示例代码,看看我们是否可以看到树木的木材:

首先,我不认为这是完全相关的类型是什么。在您的示例中,我们从不推送 a Type1,因此我们只使用整数。它可能会让事情变得更容易。

接下来,我们可以只使用 Observable.Timer 而不是大循环+任务+thread.sleep 的东西来清理创建。

现在我们有一个简单的起点:

var source = Observable.Timer(TimeSpan.FromSeconds(3),TimeSpan.FromMilliseconds(300), Scheduler.TaskPool);   
var feed = source.Publish().RefCount();

所以我们的第一个问题是对您正在使用的 Window 重载的误解。我认为您的期望是当第一个值被推送时它会打开一个窗口。事实并非如此。每次打开窗口时都会订阅 Timer(即 Observable.Timer(TimeSpan.FromSeconds(5))),最初是订阅发生时,然后在窗口本身关闭时再次订阅。因此,计时器立即启动,您只会获得第一个窗口的 2 秒值。

接下来,我将画出我的问题空间。我最喜欢的方法是使用大理石图。他们在 ASCII 中翻译得不是很好,但让我们尝试任何方式。

给定这个输入序列:

//Seconds             1111111111222222222233333333334444444444555555555566666666667777777777888888888899999999990000000000
//Tenths    01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890
//
//source  : ------------------------------0--1--2--3--4--5--6--7--8--9--0--1--2--3--4--5--6--7--8--9--0--1--2--3--4--5--6--7--8--9--0

这应该表示在 3.0 秒时产生的值“0”。然后是 3.3 秒时的“1”等。

现在有了一点问题空间的清晰性,我们可以画出我们认为窗口应该打开的位置,它应该关闭的位置以及下一个窗口应该打开的位置。

让我们看看我们想要什么。

在这里,我们添加了一个 Window1 (W1),它在第一个值被按下时打开 ('1' @3.0s)。5 秒后关闭。在此窗口期间,我们希望在第一个值 1 之后保持静音。

窗口 2(W2)应该在下一个值产生后打开,而不是在最后一个窗口关闭后立即打开(我认为?!)。在这里,我们看到当在 8.4 秒时按下值“17”时,这是 Opened。

//Seconds             1111111111222222222233333333334444444444555555555566666666667777777777888888888899999999990000000000
//Tenths    01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890

//source  : ------------------------------0--1--2--3--4--5--6--7--8--9--1--1--1--1--1--1--1--1--1--1--2--2--2--2--2--2--2--2--2--2--3--3--3--3--3--3--3--3--3--3--4--4--4--4-
                                                                        0  1  2  3  4  5  6  7  8  9  0  1  2  3  4  5  6  7  8  9  0  1  2  3  4  5  6  7  8  9  0  1  2  3
//
//W1      :                               0-------------------------------------------------|
//W2      :                                                                                  (17)----------------------------------------------|
//W3      :                                                                                                                                     (34)------------------------>

//expected: ------------------------------0--------------------------------------------------1--------------------------------------------------3---------------------------
                                                                                             7                                                  4

既然我们现在正在寻找什么值,我们就可以构造一个查询。

我想出了这个。我假设提要实际上是一个热序列。使用这个假设,我构建了一个重复结构,它从提要中获取 1 个值,并将 5 秒的静默连接到序列中。然后我添加了重复操作符,它只会在 5 秒的静默期过后重新订阅提要。

public static IObservable<T> Silencer<T>(this IObservable<T> source, TimeSpan minSilencePeriod)
{
    return source.Take(1)
                 .Concat(Observable.Empty<T>().Delay(minSilencePeriod))
                 .Repeat();
}

这确实会按预期产生值 0、17、51 等。

现在将其应用于原始问题的代码(清理一些东西)

void Main()
{
    var source = Observable.Timer(TimeSpan.FromSeconds(3),TimeSpan.FromMilliseconds(300), Scheduler.TaskPool).Select(_=>new Thing());   
    var feed = source.Publish().RefCount();

    var ofType1 = feed.Where(t => t.ActivationType == "Type1");
    var ofType2 = feed
            .Where(t => t.ActivationType == "Type2")
            .Silencer(TimeSpan.FromSeconds(5));


    var query = ofType1.Merge(ofType2);        
    var subscription = query.Subscribe(t => Console.WriteLine("UPDATE: " + t.ID + " " + DateTime.Now.ToString("hh:mm:ss:fff")));    


    Console.ReadLine();
    subscription.Dispose();
}

我们看到输出值至少相隔 5 秒

UPDATE: 3f0fc6f3-8a5a-476f-9661-b7330ab77877 09:14:04:725
UPDATE: fc8f0025-7a79-4329-8164-b8b421ad5865 09:14:09:817
UPDATE: ad739a71-885e-4d5b-a352-2302df0a4d87 09:14:14:925
于 2013-03-26T09:23:08.040 回答