12

我有一个应用程序,它在某些时候几乎同时引发 1000 个事件。我想做的是将事件批处理为 50 个项目的块,并每 10 秒开始处理它们。在开始新的批处理之前无需等待批处理完成。

例如:

10:00:00: 10000 new events received
10:00:00: StartProcessing (events.Take(50))
10:00:10: StartProcessing (events.Skip(50).Take(50))
10:00:15: StartProcessing (events.Skip(100).Take(50))

任何想法如何实现这一目标?我想 Reactive Extensions 是要走的路,但其他解决方案也是可以接受的。

我试图从这里开始:

        var bufferedItems = eventAsObservable
            .Buffer(15)
            .Delay(TimeSpan.FromSeconds(5)

但注意到延迟并没有像我希望的那样起作用,而是所有批次同时开始,尽管延迟了 5 秒。

我还测试了 Window 方法,但我没有注意到行为上有任何差异。我想 Window 中的 TimeSpan 实际上意味着“获取接下来 10 秒内发生的每个事件:

        var bufferedItems = eventAsObservable
            .Window(TimeSpan.FromSeconds(10), 5)
            .SelectMany(x => x)
            .Subscribe(DoProcessing);

我正在使用 Rx-Main 2.0.20304-beta。

4

4 回答 4

23

如果你不想让线程休眠,你可以这样做:

var tick = Observable.Interval(TimeSpan.FromSeconds(5));

eventAsObservable
.Buffer(50)
.Zip(tick, (res, _) => res)
.Subscribe(DoProcessing);
于 2012-06-11T13:26:24.487 回答
2

为此有一个特定的 Buffer 方法重载:https ://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx

observable.Buffer(TimeSpan.FromSeconds(5), 50);
于 2016-04-08T12:41:27.540 回答
1

这是一个令人惊讶的难以解决的问题。更是如此,因为使用Zip运算符将​​可观察对象与 , 对齐的诱人想法Observable.Interval是错误的并且效率低下。与非对称可观察对象一起使用时,运算符的主要问题Zip是它缓冲了最快生成的可观察对象的元素,从而可能导致在长期订阅期间分配大量内存。恕我直言,此运算符的使用应仅限于预期在长期内产生相等(或接近相等)数量的元素的成对可观察对象。

当发出值比源 observable 更快时, Zip+Observable.Interval组合的错误行为就会出现。Observable.Interval在这种情况下,由 发出的多余值Observable.Interval被缓冲,因此当源 observable 发出下一个元素时,已经有一个缓冲Interval值形成一对,导致违反“元素之间的最小间隔”策略。

下面是一个自定义WithInterval运算符的实现,它在可观察序列的连续元素之间施加最小间隔。然后,此运算符将用于解决此问题的特定问题,该问题涉及缓冲区而不是单个元素:

/// <summary>Intercepts a minimum interval between consecutive elements of an
/// observable sequence.</summary>
public static IObservable<T> WithInterval<T>(this IObservable<T> source,
    TimeSpan interval, IScheduler scheduler = null)
{
    return source
        .Scan((Observable.Return(0L), (IObservable<T>)null), (state, x) =>
        {
            var (previousTimer, _) = state;
            var timer = (scheduler != null ? Observable.Timer(interval, scheduler)
                : Observable.Timer(interval)).PublishLast();
            var delayed = previousTimer.Select(_ => x).Finally(() => timer.Connect());
            return (timer, delayed);
        })
        .Select(e => e.Item2)
        .Concat();
}

Observable.Timer此实现在连续元素之间放置一个。棘手的部分是如何在正确的时刻激活每个计时器。它是通过Publish设置计时器来实现的,并在每个计时器完成时让下一个计时器预热(Connect)。

有了这个操作符,实现一个自定义BatchWithInterval操作符就很简单了:

/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers which are produced based on element count information,
/// intercepting a minimum interval between consecutive buffers.</summary>
public static IObservable<IList<T>> BatchWithInterval<T>(this IObservable<T> source,
    int count, TimeSpan interval, IScheduler scheduler = null)
{
    return source.Buffer(count).WithInterval(interval, scheduler);
}

使用示例:

var subscription = eventAsObservable
    .BatchWithInterval(50, TimeSpan.FromSeconds(10))
    .Subscribe(DoProcessing);
于 2021-01-11T01:58:35.187 回答
-1

试试这个:

    var bufferedItems = eventAsObservable
        .Buffer(50)
        .Do(_ => { Thread.Sleep(10000); });
于 2012-06-07T09:38:17.960 回答