0

我有一个来自 Reactive 扩展框架的 IObservable [在下面的示例中命名的行],我想为它观察到的每个对象添加索引号。

我尝试使用 Zip 函数来实现这一点:

rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) => 
    new { Row = row, Index = index })
    .Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());

..但不幸的是,这会引发

ArgumentOutOfRangeException:指定的参数超出了有效值的范围。参数名称:一次性用品

我对 Zip 函数的理解是错误的还是我的代码有问题?

代码的 Range 部分似乎不是问题,并且 IObservable 尚未接收任何事件。

4

3 回答 3

1

.Select 具有包含索引的重载:

rows.Select((row, index) => new { row, index });
于 2010-04-25T02:12:35.573 回答
0

显然,Zip 扩展方法将原始自定义 IObservable 转换为匿名 observable 并订阅它会创建一个 System.Collections.Generic.AnonymousObserver,它不实现 IDisposable。因此,您无法以正常方式实现 Subscribe 方法(至少我看到它使用的方式),即

public IDisposable Subscribe(IObserver<T> observer) {
  // ..add to observer list..
  return observer as IDisposable
}

更可能的正确答案是:

return Disposable.Create(() => Observers.Remove(observer));

您应该注意,collction 可能会在 Completed-method 期间被修改,因此在处理它们之前创建列表的副本:

public void Completed()
{
    foreach (var observer in Observers.ToList())
    {
        observer.OnCompleted();
    }
 }
于 2010-03-25T12:58:01.897 回答
0

我不确定您的问题是什么,这对您有用吗(以及您正在做什么这里缺少什么?):

    static void Main(string[] args)
    {
        var rows = new List<int> { 4,5,1,2,5 }.ToObservable();
        rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) =>
            new { Row = row, Index = index })
            .Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());

        Console.ReadLine();
    }
    static void ProcessRow(int row, int index) {
        Console.WriteLine("Row {0}, Index {1}", row, index);
    }
    static void Completed() {
    }
于 2010-03-28T13:51:23.013 回答