5

我的服务器端向我发送了一批消息。批次中的消息数量和频率是任意的。有时,我每隔 1 分钟收到一次消息,有时一小时内没有消息。从 1 到 10 条消息。

我当前的实现用于Observable.Buffer(TimeSpan.FromSeconds(5))对消息进行分组并将其发送给订阅者。

如果两条消息之间有 x 秒的延迟,是否有办法配置 Observable,而不是每 5 秒检查一次。

如何避免不必要的计时器每 5 秒滴答一次?(我愿意接受其他优化批处理的建议。)

4

3 回答 3

10

使用 bufferClosingSelector 工厂方法

decPL 建议使用Buffer接受 a的重载bufferClosingSelector- 在打开新缓冲区时调用的工厂函数。它产生一个流,其第一个OnNext()OnCompleted()信号刷新当前缓冲区。decPLs 代码如下所示:

observable.Buffer(() => observable.Throttle(TimeSpan.FromSeconds(5)))

这在解决方案方面取得了相当大的进展,但它有几个问题:

  • 服务器不会在消息在限制持续时间内一致发布的活动期间发送消息。这可能会导致大量的、不经常发布的列表。
  • 源有多个订阅;如果天气很冷,这可能会产生意想不到的副作用。每次bufferClosingSelector缓冲区关闭后都会调用工厂,因此如果源是冷的,它将从初始事件开始节流,而不是最近的事件。

防止无限期节流

我们需要使用额外的机制来限制缓冲区长度并防止无限限制。Buffer有一个允许您指定最大长度的重载,但不幸的是,您不能将其与关闭选择器结合使用。

让我们调用所需的缓冲区长度限制n。回想一下,OnNext关闭选择器的第一个就足以关闭缓冲区,所以我们需要做的就是Merge使用一个计数流的节流阀,该计数流在来自源的n 个事件OnNext之后发送。我们可以用它来做到这一点;采取前n 个事件,但忽略除最后一个之外的所有事件。这是 Rx 中非常有用的模式。.Take(n).LastAsync()

让源“热”

为了解决bufferClosingSelector工厂重新订阅源的问题,我们需要使用.Publish().RefCount()源上的通用模式给我们一个只会将最近的事件发送给订阅者的流。这也是一个非常有用的模式要记住。

解决方案

这是重新编写的代码,其中油门持续时间与计数器合并:

var throttleDuration = TimeSpan.FromSeconds(5);
var bufferSize = 3;

// single subscription to source
var sourcePub = source.Publish().RefCount();

var output = sourcePub.Buffer(
    () => sourcePub.Throttle(throttleDuration) 
                   .Merge(sourcePub.Take(bufferSize).LastAsync()));

生产就绪代码和测试

这是一个带有测试的生产就绪实现(使用 nuget 包 rx-testing 和 nunit)。请注意调度程序的参数化以支持测试。

public static partial class ObservableExtensions
{
    public static IObservable<IList<TSource>> BufferNearEvents<TSource>(
        this IObservable<TSource> source,
        TimeSpan maxInterval,
        int maxBufferSize,
        IScheduler scheduler)
    {
        if (scheduler == null) scheduler = ThreadPoolScheduler.Instance;
        if (maxBufferSize <= 0)
            throw new ArgumentOutOfRangeException(
                "maxBufferSize", "maxBufferSize must be positive");

        var publishedSource = source.Publish().RefCount();

        return publishedSource.Buffer(
            () => publishedSource
                .Throttle(maxInterval, scheduler)
                .Merge(publishedSource.Take(maxBufferSize).LastAsync()));
    }
}

public class BufferNearEventsTests : ReactiveTest
{
    [Test]
    public void CloseEventsAreBuffered()
    {
        TimeSpan maxInterval = TimeSpan.FromTicks(200);
        const int maxBufferSize = 1000;

        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3));

        IList<int> expectedBuffer = new [] {1, 2, 3};
        var expectedTime = maxInterval.Ticks + 300;

        var results = scheduler.CreateObserver<IList<int>>();

        source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
              .Subscribe(results);

        scheduler.AdvanceTo(1000);

        results.Messages.AssertEqual(
            OnNext<IList<int>>(expectedTime, buffer => CheckBuffer(expectedBuffer, buffer)));
    }

    [Test]
    public void FarEventsAreUnbuffered()
    {
        TimeSpan maxInterval = TimeSpan.FromTicks(200);
        const int maxBufferSize = 1000;

        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(1000, 1),
            OnNext(2000, 2),
            OnNext(3000, 3));

        IList<int>[] expectedBuffers =
        {
            new[] {1},
            new[] {2},
            new[] {3}
        };

        var expectedTimes = new[]
        {
            maxInterval.Ticks + 1000,
            maxInterval.Ticks + 2000,
            maxInterval.Ticks + 3000
        };  

        var results = scheduler.CreateObserver<IList<int>>();

        source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
              .Subscribe(results);

        scheduler.AdvanceTo(10000);

        results.Messages.AssertEqual(
            OnNext<IList<int>>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)),
            OnNext<IList<int>>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)),
            OnNext<IList<int>>(expectedTimes[2], buffer => CheckBuffer(expectedBuffers[2], buffer)));
    }

    [Test]
    public void UpToMaxEventsAreBuffered()
    {
        TimeSpan maxInterval = TimeSpan.FromTicks(200);
        const int maxBufferSize = 2;

        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3));

        IList<int>[] expectedBuffers =
        {
            new[] {1,2},
            new[] {3}
        };

        var expectedTimes = new[]
        {
            200, /* Buffer cap reached */
            maxInterval.Ticks + 300
        };

        var results = scheduler.CreateObserver<IList<int>>();

        source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
              .Subscribe(results);

        scheduler.AdvanceTo(10000);

        results.Messages.AssertEqual(
            OnNext<IList<int>>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)),
            OnNext<IList<int>>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)));
    }

    private static bool CheckBuffer<T>(IEnumerable<T> expected, IEnumerable<T> actual)
    {
        CollectionAssert.AreEquivalent(expected, actual);
        return true;
    }
}
于 2013-11-15T12:34:33.753 回答
3

如果我正确理解了您的描述,Observable.Buffer仍然是您的朋友,只需使用导致可观察事件的重载来指示何时应发送缓冲项目。如下所示:

observable.Buffer(() => observable.Throttle(TimeSpan.FromSeconds(5)))
于 2013-11-15T10:42:12.803 回答
1

这是一个老问题,但似乎与我最近的问题有关。Enigmativity 找到了一种很好的方法来做我认为你想要实现的事情,所以我想我会分享。我将解决方案包装在扩展方法中:

public static class ObservableExtensions
{
    public static IObservable<T[]> Batch<T>(this IObservable<T> observable, TimeSpan timespan)
    {
        return observable.GroupByUntil(x => 1, g => Observable.Timer(timespan))
                         .Select(x => x.ToArray())
                         .Switch();
    }
}

它可以像这样使用:

observableSource.Batch(TimeSpan.FromSeconds(5));
于 2015-04-25T12:50:12.537 回答