4

这与我的其他问题有关James World提出了如下解决方案:

// idStream is an IObservable<int> of the input stream of IDs
// alarmInterval is a Func<int, TimeSpan> that gets the interval given the ID
var idAlarmStream = idStream
.GroupByUntil(key => key, grp => grp.Throttle(alarmInterval(grp.Key)))
.SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key)));

<编辑2:

问题:如何在不等待第一个事件到达的情况下立即启动计时器?我猜这是我问题的根本问题。为此,我计划发送带有我知道应该存在的 ID 的虚拟对象。但正如我在下文中所写,我遇到了一些其他问题。不过,我认为解决这个问题也会很有趣。

然后转发其他有趣的部分!现在,如果我想像下面这样对一个复杂的对象进行分组并按如下键分组(不会编译)

var idAlarmStream = idStream
    .Select(i => new { Id = i, IsTest = true })
    .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key)))
    .SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key)));

然后我就有麻烦了。我无法修改 about 部分SelectManyConcat因此Observable.Return查询可以像以前一样工作。例如,如果我查询为

var idAlarmStream = idStream
    .Select(i => new { Id = i, IsTest = true })
    .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key)))
    .SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key.First())))
    .Subscribe(i => Console.WriteLine(i.Id + "-" + i.IsTest);

然后需要两个事件才能在 中观察到输出Subscribe。我认为这是调用 to 的效果First。此外,我也想在调用中使用复杂的对象属性alarmInterval

有人可以解释发生了什么,甚至可以提供解决方案吗?使用未修改的解决方案的问题在于,分组不仅看 Ids 作为键值,还看 IsTest 字段。

<编辑:作为说明,这个问题可能首先可以通过创建一个显式的类或结构来解决,然后实现一个自定义IEquatable,然后按原样使用 James 的代码,以便仅通过 ID 进行分组。感觉就像黑客一样。

4

2 回答 2

1

您正在 Select 中创建匿名类型。让我们称之为A1。我会假设你的 idStream 是一个 IObservable。由于这是其中的关键,因此GroupByUntil您无需担心关键比较 - int 相等性很好。

GroupByUntil一个IObservable<IGroupedObservable<int, A1>>.

所写的 SelectMany 试图成为IObservable<A1>. 您只需要在Concat(Observable.Return(grp.Key))这里 - 但是 Key 的类型和 Group 元素的类型必须匹配,否则 SelectMany 将不起作用。所以钥匙也必须是A1。匿名类型使用结构相等,返回类型将是 A1 的流 - 但您不能将其声明为公共返回类型。

如果你只想要 ID,你应该在.Select(x => x.Id)后面添加一个Throttle

var idAlarmStream = idStream
    .Select(i => new { Id = i, IsTest = true })
    .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key)
                                           .Select(x => x.Id))
    .SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key)));

如果您想要 A1 - 您需要创建一个实现 Equality 的具体类型。

编辑

我没有测试过,但你也可以像这样更简单地把它弄平,我认为这更容易!但是它正在输出 A1,因此如果您需要在某处返回流,则必须处理该问题。

var idAlarmStream = idStream
    .Select(i => new { Id = i, IsTest = true })
    .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key))
    .SelectMany(grp => grp.TakeLast(1));
于 2013-10-17T13:19:07.610 回答
1

此外,如果您想计算在闹钟响起之前您看到某个项目的次数,您可以这样做,利用Select.

var idAlarmStream = idStream
    .Select(i => new { Id = i, IsTest = true })
    .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key))
    .SelectMany(grp => grp.Select((count, alarm) => new { count, alarm }).TakeLast(1));

请注意,对于第一个(种子)项目,这将是 0 - 这可能是您想要的。

于 2013-10-17T13:55:13.747 回答