5

我有一个事件源,它生成属于某些组的事件。我想缓冲这些组并将这些组(分批)发送到存储。到目前为止,我有这个:

eventSource
    .GroupBy(event => event.GroupingKey)
    .Select(group => new { group.Key, Events = group })
    .Subscribe(group => group.Events
                            .Buffer(TimeSpan.FromSeconds(60), 100)
                            .Subscribe(list => SendToStorage(list)));

所以有一个嵌套订阅组中的事件。不知何故,我认为有更好的方法,但我还没有弄清楚。

4

2 回答 2

10

这是解决方案:

eventSource
    .GroupBy(e => e.GroupingKey)
    .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
    .Subscribe(list => SendToStorage(list));

以下是一些可以帮助您“减少”的一般规则:

1) 嵌套订阅通常固定Select在嵌套订阅之前的所有内容,然后是 a Merge,然后是嵌套订阅。所以应用它,你会得到这个:

eventSource
    .GroupBy(e => e.GroupingKey)
    .Select(group => new { group.Key, Events = group })
    .Select(group => group.Events.Buffer(TimeSpan.FromSeconds(60), 100)) //outer subscription selector
    .Merge()
    .Subscribe(list => SendToStorage(list));

2)您显然可以组合两个连续的选择(并且由于您没有对匿名对象做任何事情,因此可以删除它):

eventSource
    .GroupBy(e => e.GroupingKey)
    .Select(group => group.Buffer(TimeSpan.FromSeconds(60), 100)) 
    .Merge()
    .Subscribe(list => SendToStorage(list));

3) 最后, aSelect后跟 aMerge可以简化为 a SelectMany

eventSource
    .GroupBy(e => e.GroupingKey)
    .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
    .Subscribe(list => SendToStorage(list));
于 2017-02-24T15:18:32.163 回答
1

这是一种方法

(from g in eventSource.GroupByUntil(e => e.GroupingKey,
                                    g => g.Buffer(TimeSpan.FromSeconds(60), 100))
 from b in g.ToList()
 select b).Subscribe(SendToStorage);
于 2017-02-24T12:11:12.573 回答