34

我想有效地限制事件流,以便在收到第一个事件时调用我的委托,但如果收到后续事件则不调用 1 秒。在超时(1 秒)到期后,如果收到后续事件,我希望调用我的代表。

有没有使用 Reactive Extensions 的简单方法来做到这一点?

示例代码:

static void Main(string[] args)
{
    Console.WriteLine("Running...");

    var generator = Observable
        .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
        .Timestamp();

    var builder = new StringBuilder();

    generator
        .Sample(TimeSpan.FromSeconds(1))
        .Finally(() => Console.WriteLine(builder.ToString()))
        .Subscribe(feed =>
                   builder.AppendLine(string.Format("Observed {0:000}, generated at {1}, observed at {2}",
                                                    feed.Value,
                                                    feed.Timestamp.ToString("mm:ss.fff"),
                                                    DateTime.Now.ToString("mm:ss.fff"))));

    Console.ReadKey();
}

电流输出:

Running...
Observed 064, generated at 41:43.602, observed at 41:43.602
Observed 100, generated at 41:44.165, observed at 41:44.602

但我想观察(时间戳显然会改变)

Running...
Observed 001, generated at 41:43.602, observed at 41:43.602
....
Observed 100, generated at 41:44.165, observed at 41:44.602
4

8 回答 8

16

好的,

你有3个场景:

1)我想每秒获得一个事件流的值。意思是:如果它每秒产生更多的事件,你将得到一个更大的缓冲区。

observableStream.Throttle(timeSpan)

2)我想获得最新的事件,即在第二次发生之前产生的意思是:其他事件被丢弃。

observableStream.Sample(TimeSpan.FromSeconds(1))

3)您想获取所有事件,这些事件发生在最后一秒。并且每一秒

observableStream.BufferWithTime(timeSpan)

4)你想用所有的值选择第二个之间发生的事情,直到第二个过去,你的结果被返回

observableStream.CombineLatest(Observable.Interval(1000), selectorOnEachEvent)
于 2010-07-09T09:56:54.390 回答
14

这是我在 RX 论坛的帮助下得到的:

这个想法是为原始序列发射一系列“票”。这些“票”会因超时而延迟,不包括第一个,它会立即预先添加到票序列中。当一个事件进来并且有票在等待时,事件立即触发,否则它等到票然后触发。当它触发时,发出下一张票,依此类推......

要将门票和原创活动结合起来,我们需要一个组合器。不幸的是,“标准” .CombineLatest 不能在此处使用,因为它会触发之前使用的票证和事件。所以我必须创建我自己的组合器,它基本上是一个过滤的.CombineLatest,只有当组合中的两个元素都是“新鲜的”时才会触发——以前从未返回过。我称之为 .CombineVeryLatest aka .BrokenZip ;)

使用 .CombineVeryLatest,上面的想法可以这样实现:

    public static IObservable<T> SampleResponsive<T>(
        this IObservable<T> source, TimeSpan delay)
    {
        return source.Publish(src =>
        {
            var fire = new Subject<T>();

            var whenCanFire = fire
                .Select(u => new Unit())
                .Delay(delay)
                .StartWith(new Unit());

            var subscription = src
                .CombineVeryLatest(whenCanFire, (x, flag) => x)
                .Subscribe(fire);

            return fire.Finally(subscription.Dispose);
        });
    }

    public static IObservable<TResult> CombineVeryLatest
        <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
        IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
    {
        var ls = leftSource.Select(x => new Used<TLeft>(x));
        var rs = rightSource.Select(x => new Used<TRight>(x));
        var cmb = ls.CombineLatest(rs, (x, y) => new { x, y });
        var fltCmb = cmb
            .Where(a => !(a.x.IsUsed || a.y.IsUsed))
            .Do(a => { a.x.IsUsed = true; a.y.IsUsed = true; });
        return fltCmb.Select(a => selector(a.x.Value, a.y.Value));
    }

    private class Used<T>
    {
        internal T Value { get; private set; }
        internal bool IsUsed { get; set; }

        internal Used(T value)
        {
            Value = value;
        }
    }

编辑:这是 Andreas Köpf 在论坛上提出的另一个更紧凑的 CombineVeryLatest 变体:

public static IObservable<TResult> CombineVeryLatest
  <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
  IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
{
    return Observable.Defer(() =>
    {
        int l = -1, r = -1;
        return Observable.CombineLatest(
            leftSource.Select(Tuple.Create<TLeft, int>),
            rightSource.Select(Tuple.Create<TRight, int>),
                (x, y) => new { x, y })
            .Where(t => t.x.Item2 != l && t.y.Item2 != r)
            .Do(t => { l = t.x.Item2; r = t.y.Item2; })
            .Select(t => selector(t.x.Item1, t.y.Item1));
    });
}
于 2010-07-11T21:42:26.560 回答
8

昨晚我在同样的问题上苦苦挣扎,并相信我找到了一个更优雅(或至少更短)的解决方案:

var delay = Observable.Empty<T>().Delay(TimeSpan.FromSeconds(1));
var throttledSource = source.Take(1).Concat(delay).Repeat();
于 2011-04-29T12:59:34.167 回答
6

这是我在Rx 论坛中作为该问题的答案发布的内容:

更新:这是一个新版本,当事件发生的时间差超过一秒时,它不再延迟事件转发:

public static IObservable<T> ThrottleResponsive3<T>(this IObservable<T> source, TimeSpan minInterval)
{
    return Observable.CreateWithDisposable<T>(o =>
    {
        object gate = new Object();
        Notification<T> last = null, lastNonTerminal = null;
        DateTime referenceTime = DateTime.UtcNow - minInterval;
        var delayedReplay = new MutableDisposable();
        return new CompositeDisposable(source.Materialize().Subscribe(x =>
        {
            lock (gate)
            {
                var elapsed = DateTime.UtcNow - referenceTime;
                if (elapsed >= minInterval && delayedReplay.Disposable == null)
                {
                    referenceTime = DateTime.UtcNow;
                    x.Accept(o);
                }
                else
                {
                    if (x.Kind == NotificationKind.OnNext)
                        lastNonTerminal = x;
                    last = x;
                    if (delayedReplay.Disposable == null)
                    {
                        delayedReplay.Disposable = Scheduler.ThreadPool.Schedule(() =>
                        {
                            lock (gate)
                            {
                                referenceTime = DateTime.UtcNow;
                                if (lastNonTerminal != null && lastNonTerminal != last)
                                    lastNonTerminal.Accept(o);
                                last.Accept(o);
                                last = lastNonTerminal = null;
                                delayedReplay.Disposable = null;
                            }
                        }, minInterval - elapsed);
                    }
                }
            }
        }), delayedReplay);
    });
}

这是我之前的尝试:

var source = Observable.GenerateWithTime(1, 
    x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
    .Timestamp();

source.Publish(o =>
    o.Take(1).Merge(o.Skip(1).Sample(TimeSpan.FromSeconds(1)))
).Run(x => Console.WriteLine(x));
于 2010-07-09T15:23:29.237 回答
2

好的,这是一个解决方案。我不喜欢它,特别是,但是……哦,好吧。

向 Jon 指出 SkipWhile 和 cRichter 的 BufferWithTime 的帽子提示。多谢你们。

static void Main(string[] args)
{
    Console.WriteLine("Running...");

    var generator = Observable
        .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
        .Timestamp();

    var bufferedAtOneSec = generator.BufferWithTime(TimeSpan.FromSeconds(1));

    var action = new Action<Timestamped<int>>(
        feed => Console.WriteLine("Observed {0:000}, generated at {1}, observed at {2}",
                                  feed.Value,
                                  feed.Timestamp.ToString("mm:ss.fff"),
                                  DateTime.Now.ToString("mm:ss.fff")));

    var reactImmediately = true;
    bufferedAtOneSec.Subscribe(list =>
                                   {
                                       if (list.Count == 0)
                                       {
                                           reactImmediately = true;
                                       }
                                       else
                                       {
                                           action(list.Last());
                                       }
                                   });
    generator
        .SkipWhile(item => reactImmediately == false)
        .Subscribe(feed =>
                       {
                           if(reactImmediately)
                           {
                               reactImmediately = false;
                               action(feed);
                           }
                       });

    Console.ReadKey();
}
于 2010-07-09T12:44:20.293 回答
0

你试过Throttle扩展方法吗?

从文档:

忽略来自可观察序列的值,该序列后跟到期时间之前的另一个值

我不太清楚这是否会做你想做的事 - 因为你想忽略以下值而不是第一个值......但我希望它是你想要的。试试看 :)

编辑:嗯......不,我认为这毕竟不是正确的事情Throttle 我相信我看到了您想要做的事情,但我在框架中看不到任何东西可以做到这一点。不过,我很可能错过了一些东西。你在 Rx 论坛上问过吗?很可能如果它现在不存在,他们会很乐意添加它:)

怀疑你可以以某种方式巧妙地做到这一点SkipUntil......SelectMany但我认为它应该采用自己的方法。

于 2010-07-09T09:01:58.460 回答
0

您正在搜索的是 CombineLatest。

public static IObservable<TResult> CombineLatest<TLeft, TRight, TResult>(
    IObservable<TLeft> leftSource,
    IObservable<TRight> rightSource,
    Func<TLeft, TRight, TResult> selector
)

that merges 2 obeservables, and returning all values, when the selector (time) has a value.

编辑:约翰是对的,这可能不是首选的解决方案

于 2010-07-09T09:03:41.977 回答
0

受 Bluelings 回答的启发,我在这里提供了一个与 Reactive Extensions 2.2.5 一起编译的版本。

此特定版本计算样本数并提供最后一个采样值。为此,使用以下类:

class Sample<T> {

  public Sample(T lastValue, Int32 count) {
    LastValue = lastValue;
    Count = count;
  }

  public T LastValue { get; private set; }

  public Int32 Count { get; private set; }

}

这是运算符:

public static IObservable<Sample<T>> SampleResponsive<T>(this IObservable<T> source, TimeSpan interval, IScheduler scheduler = null) {
  if (source == null)
    throw new ArgumentNullException(nameof(source));
  return Observable.Create<Sample<T>>(
    observer => {
      var gate = new Object();
      var lastSampleValue = default(T);
      var lastSampleTime = default(DateTime);
      var sampleCount = 0;
      var scheduledTask = new SerialDisposable();
      return new CompositeDisposable(
        source.Subscribe(
          value => {
            lock (gate) {
              var now = DateTime.UtcNow;
              var elapsed = now - lastSampleTime;
              if (elapsed >= interval) {
                observer.OnNext(new Sample<T>(value, 1));
                lastSampleValue = value;
                lastSampleTime = now;
                sampleCount = 0;
              }
              else {
                if (scheduledTask.Disposable == null) {
                  scheduledTask.Disposable = (scheduler ?? Scheduler.Default).Schedule(
                    interval - elapsed,
                    () => {
                      lock (gate) {
                        if (sampleCount > 0) {
                          lastSampleTime = DateTime.UtcNow;
                          observer.OnNext(new Sample<T>(lastSampleValue, sampleCount));
                          sampleCount = 0;
                        }
                        scheduledTask.Disposable = null;
                      }
                    }
                  );
                }
                lastSampleValue = value;
                sampleCount += 1;
              }
            }
          },
          error => {
            if (sampleCount > 0)
              observer.OnNext(new Sample<T>(lastSampleValue, sampleCount));
            observer.OnError(error);
          },
          () => {
            if (sampleCount > 0)
              observer.OnNext(new Sample<T>(lastSampleValue, sampleCount));
            observer.OnCompleted();
          }
        ),
        scheduledTask
      );
    }
  );
}
于 2017-03-27T21:22:45.303 回答