1

我想通过在特定时间间隔内从一个元素移动到另一个元素来迭代一个集合。因此,例如,此方法可以正常工作:

        var a = new List<int> { 1, 2, 3, 4}.ToObservable();
        var b = Observable.Interval(TimeSpan.FromSeconds(1));
        var c = a.Zip(b, (l, r) => l);
        c.Subscribe(x => Debug.WriteLine(x));

但是我想使用列表中每个元素的值作为间隔,所以我使用了这段代码:

        var a = new List<int> { 1, 2, 3, 4}.ToObservable();
        var b = a.Delay(x => Observable.Timer(TimeSpan.FromSeconds(x)));
        b.Subscribe(x => Debug.WriteLine(x));

正如这里所说的http://blogs.msdn.com/b/rxteam/archive/2012/03/12/reactive-extensions-v2-0-beta-available-now.aspx “延迟的新重载允许一个指定(可选)订阅的延迟以及基于选择器函数的每个元素的延迟”。但是运行代码并不像预期的那样工作。它只是以 1 秒的间隔吐出列表中的元素。像这样:

...(1 秒)1 ...(1 秒)2 ...(1 秒)3 ...(1 秒)4

代替

...(1 秒)1 ...(2 秒)2 ...(3 秒)3 ...(4 秒)4

我错过了什么吗?

4

3 回答 3

1

这个怎么样:

new[] {1,2,3,4,5,}.ToObservable()
    .Select(x => Observable.Return(x).Delay(TimeSpan.FromSeconds(x)))
    .Concat();

正确的?您想将数字用作值以及延迟时间量吗?

于 2013-09-27T06:58:37.380 回答
1

这是一个解决方案,它只在可枚举源上推进迭代器一步,然后再次等待(适用于无限可枚举源序列):

static IObservable<T> ToObservableDelay<T>(
    IEnumerable<T> source, 
    Func<T, TimeSpan> delaySelector, 
    IScheduler scheduler
)
{
    return Observable.Create<T>(o =>
        scheduler.ScheduleAsync(async (s, c) =>
        {
            try
            {
                foreach (var x in source)
                {
                    await Task.Delay(delaySelector(x), c);
                    o.OnNext(x);
                }
                o.OnCompleted();
            }
            catch (TaskCanceledException) { /* ignore */ }
            catch (Exception e)
            {
                o.OnError(e);
            }
         })
    );
}

这里有一个带有无限生成器的小演示:

static IEnumerable<double> Uniform(Random rng)
{
    while (true)
        yield return rng.NextDouble();
}

static void Main(string[] args)
{
    var source = Uniform(new Random());

    Console.WriteLine("press any key to quit");
    using (var subscription = 
        ToObservableDelay(source, TimeSpan.FromSeconds, Scheduler.Default)
        .Subscribe(Console.WriteLine))
    {
        Console.ReadKey();
    }
}

在 RT 应用程序中运行的简单演示代码:

 var source = Uniform(new Random());
 ToObservableDelay(source, TimeSpan.FromSeconds, Scheduler.Default)
     .Take(10)
     .Subscribe(x => Debug.WriteLine(x));
于 2013-09-27T08:00:21.067 回答
0

我在尝试解决相同问题时发现了这个问题。我的解决方案是将 Observable.Generator 与 IEnumerable 的枚举器一起使用,如下所示:

public static IObservable<T> ToObservable<T>(this IEnumerable<T> source, TimeSpan time)
{
    var enumerator = source.GetEnumerator();

    var observable = Observable.Generate(
        enumerator.Current,
        _ => enumerator.MoveNext(),
        _ => enumerator.Current,
        _ => enumerator.Current,
        _ => time);

    return observable;
}

然后像这样简单地调用这个扩展方法:

var observable = Enumerable.Range(1, 10)
    .ToObservable(TimeSpan.FromSeconds(1));

using (observable.Subscribe(Console.WriteLine))
{
    Console.WriteLine("Press the Any key to abort");
    Console.ReadKey();
}
于 2014-06-09T12:35:02.067 回答