3

我有以下类型...

public class NewsFeed
{
    public event EventHandler<NewsItemEventArgs> NewItem;

    .....

}

public class NewsItemEventArgs : EventArgs
{
    public NewsItem Item;
    public NewsItemEventArgs(NewsItem newsItem)
    {
        Item = newsItem;
    }
}

public class NewsItem
{
    public int Id { get; set; }
    public string Title { get; set; }
    public string Body { get; set; }
}

NewsFeed 的 NewItem 事件触发带有 NewsItemEventArgs 类型的 eventArgs 的事件。在我的系统中,事件以突发形式发布,例如在 1 秒的小窗口中发布 10 个 NewsItems,然后在 60 秒内没有进一步的新闻报道。我想用 RX 来平滑这些突发事件,这样我的 UI 新闻故事“似乎”以更规律的间隔(比如 5 秒)一次到达一个。

我知道我需要创建一个类似下面的 observable

_source = Observable.FromEventPattern<NewsItemEventArgs>(
                        h => _newsFeed.NewItem += h,
                        h => _newsFeed.NewItem -= h);

但我不知道如何转换和订阅 observable,以便我得到事件的滴灌,而不是上述突发事件。

有任何想法吗?

4

3 回答 3

3

Zip 可能不是此操作的最佳选择,因为生产者有时可能会变慢,从而导致输出抖动。

Rx 2.0似乎仍然无法进行准确的调度。DateTimeOffsetTimeSpan不过,现在工作。您可以通过将TimeSpan偏移量替换为DateTimeOffset.

综上所述,如果我们可以指定两个连续值之间的最小间隔,就可以解决突发问题。

    static IObservable<T> DelayBetweenValues<T>(this IObservable<T> observable, TimeSpan interval, IScheduler scheduler)
    {
        return Observable.Create<T>(observer =>
        {
            var offset = TimeSpan.Zero;
            return observable
                .TimeInterval(scheduler)
                .Subscribe
                (
                    ts =>
                    {
                        if (ts.Interval < interval)
                        {
                            offset = offset.Add(interval);
                            scheduler.Schedule(offset, () => observer.OnNext(ts.Value));
                        }
                        else
                        {
                            offset = TimeSpan.Zero;
                            observer.OnNext(ts.Value);
                        }
                    }
                );
        });
    }

测试:

        Observable.Interval(TimeSpan.FromSeconds(2.5))
                  .Do(_ => Console.WriteLine("Burst"))
                  .SelectMany(i => Enumerable.Range((int)i, 10))
                  .DelayBetweenValues(TimeSpan.FromSeconds(0.2), TaskPoolScheduler.Default)
                  .Subscribe(Console.WriteLine);
于 2012-09-29T14:56:24.583 回答
2

您可以使用另一个定期生成值的序列来压缩您的序列:

Observable<NewsItem> nis = _source
    .Zip(Observable.Timer(Timespan.FromSeconds(5), TimeSpan.FromSeconds(5)), (e, _) => e)
    .Select(eventArgs => eventArgs.Item);
于 2012-09-29T10:17:11.623 回答
1

I couldn't get the answer from Asti to work. So I tried the undocumented Delay overload with the delayDurationSelector. I got it working for my problem, though somehow it's not behaving correctly when I use the scheduler in the timer, but it works without it:

public static IObservable<T> DelayBetweenValues<T>(this IObservable<T> observable, TimeSpan interval,
    IScheduler scheduler)
{
        var offset =  TimeSpan.Zero;
        return observable
            .TimeInterval(scheduler)
            .Delay(ti =>
            {
                offset = (ti.Interval < interval) ? offset.Add(interval) : TimeSpan.Zero;
                return Observable.Timer(offset);
            })
            .Select(ti => ti.Value);
}
于 2013-12-17T21:32:10.937 回答