6

我正在使用 Reactive 扩展(版本 2.1,以防万一)开始开发,对于我的示例应用程序,我需要以一定间隔(即每 1 秒)推送的一系列 int 值。

我知道,我可以创建一个序列,Observable.Range<int>(0,10)但我不知道如何设置推送之间的相对时间。我已经尝试过Delay(),但在开始时只改变了一次序列。

然后我找到Observable.Generate()了可以通过以下方式调整到此任务的方法:

var delayed = Observable.
              Generate(0, i => i <= 10, i => i + 1, i => i,
                          i => TimeSpan.FromSeconds(1));

但这似乎只适用于简单的“for-each-like”定义的序列。所以,总的来说,我的问题是,我们是否可以获取任何源序列并用一些代理包装它,该代理将从源中提取消息并在时间延迟的情况下进一步推送它?

S--d1--d2--d3--d4--d5-|
D--d1-delay-d2-delay-d3-delay-d4-delay-d5-|

PS如果这种方法与 ReactiveExtensions 的概念相矛盾,也请注意这一点。我不想“无论如何”这样做,他们将来会遇到其他一些设计问题。

PPS的一般理念是确保输出序列在事件之间具有指定的间隔,而不管输入序列是有限还是无限以及它推送事件的频率如何。

4

5 回答 5

9

Observable.Interval 是您要查看的内容。它将生成一个基于 0 的长值,在您指定的每个时间间隔内递增 1,例如:

Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(x => Console.WriteLine(x));

然后,您可以根据需要使用投影 ( Select) 偏移/更改此值。

您还可以使用 Zip 运算符将一个流与另一个流“同步”——您可能也想查看它。Zip 将来自两个流的事件配对在一起,因此它以当前最慢的流的速度发出。Zip 也非常灵活,它可以压缩任意数量的流,甚至可以将 IObservable 压缩到 IEnumerable。这是一个例子:

var pets = new List<string> { "Dog", "Cat", "Elephant" };
var pace = Observable.Interval(TimeSpan.FromSeconds(1))
    .Zip(pets, (n, p) => p)     
    .Subscribe(x => Console.WriteLine(x), () => Console.WriteLine("Done"));

这会以 1 秒的间隔写出宠物。

根据上面添加的 PPS,我将给出另一个答案 - 我将把它留作参考,因为它无论如何都是一种有用的技术。

于 2013-10-05T23:38:07.513 回答
4

因此,为了澄清,您希望输出以不快于间隔的速率推动输入,否则尽可能快。

在这种情况下,试试这个。input变量构造是一种创建短的零星序列的方法,该序列有时比 2 秒快,有时比 2 秒慢。请注意,秒表的输出将显示 Rx 使用的计时器机制中的小错误。

var input = Observable.Interval(TimeSpan.FromSeconds(1)).Take(4);
input = input.Concat(Observable.Interval(TimeSpan.FromSeconds(5)).Take(2));

var interval = TimeSpan.FromSeconds(2);

var paced = input.Select(i => Observable.Empty<long>()
                                        .Delay(interval)
                                        .StartWith(i)).Concat();

var stopwatch = new Stopwatch();
stopwatch.Start();
paced.Subscribe(
    x => Console.WriteLine(x + " " + stopwatch.ElapsedMilliseconds),
    () => Console.WriteLine("Done"));

此示例通过将输入中的每个刻度投影到一个序列中来工作,该序列在开始时将刻度作为单个事件,但在所需的时间间隔内没有 OnComplete。然后连接得到的流。如果输出当前“刷新”,这种方法可确保立即发出新的滴答声,否则会相互缓冲。

您可以将其包装在扩展方法中以使其通用。

于 2013-10-09T08:25:58.370 回答
1

这是做你想做的最简单的方法:

var delayed =
    source.Do(x => Thread.Sleep(1000));

它增加了第二个延迟,但它确实在第一个项目之前这样做了。你当然可以总结一些逻辑,不要在一开始就延迟。那不会太难。


这是一个替代方案,可以安排新趋势的延迟。

var delayed =
    Observable.Create<int>(o =>
    {
        var els = new EventLoopScheduler();
        return source
            .ObserveOn(els)
            .Do(x => els.Schedule(() => Thread.Sleep(1000)))
            .Subscribe(o);
    });
于 2013-10-11T02:11:01.990 回答
1

我知道这是一个老问题,但我认为我有正确的答案。

即使源 Observable 没有产生任何东西,压缩 Observable.Timer 也会产生“滴答声”。这意味着一旦源产生另一个项目,任何已经产生但尚未消耗的刻度都将被使用。导致当生产者以稳定的速率生产项目时,会在项目之间增加延迟,但如果生产者有时需要更长的时间来生产项目,则会产生项目的爆发。

为了避免这种情况,您需要生成一个计时器,该计时器仅在您的 observable 生成的每个项目之间生成一个项目。您可以像这样使用 Observable.Switch 执行此操作:

var subject = new Subject<Unit>();

        var producer = subject.SelectMany(
                                  _ =>
                                  {
                                      return new[]
                                      {
                                          Observable.Return(true),
                                          Observable.Timer(TimeSpan.FromSeconds(2))
                                                    .Select(q => false)
                                      };
                                  })
                              .Switch();             
于 2017-02-24T14:51:30.417 回答
0

也许您正在寻找的是Buffer扩展方法。它的签名是这样定义的:

public static IObservable<IList<TSource>> Buffer<TSource>(
    this IObservable<TSource> source,
    TimeSpan timeSpan)

它将以批量生成值的方式转换源序列timeSpan

于 2013-10-13T09:36:36.300 回答