10

最近,我一直试图了解 .NET 的 Reactive Extensions,但遇到了一些概念性问题:我无法弄清楚为什么 IObservable.First() 会阻塞。

我有一些看起来有点像这样的示例代码:

var a = new ListItem(a);
var b = new ListItem(b);
var c = new ListItem(c);
var d = new ListItem(d);

var observableList = new List<ListItem> { a,b,c,d }.ToObservable();

var itemA = observableList.First();

// Never reached
Assert.AreEqual(expectedFoo, itemA.Foo);

我期望发生的是itemA引用等于a并能够访问其成员等。相反,发生的是First()阻塞并且Assert.AreEqual()永远不会到达。

现在,我知道在使用 Rx 时,代码应该Subscribe()IObservables,所以很可能我在这里做错了。但是,根据各种方法签名来执行以下任一操作是不可能的:

observableList.First().Subscribe(item => Assert.AreEqual(expectedFoo, item));

或者

observableList.Subscribe(SomeMethod).First() // This doesn't even make sense, right?

我错过了什么?

4

2 回答 2

9

在测试项目中尝试这段代码效果很好,所以我重新审视了有问题的代码。原来问题是因为IObservable<ListItem>Publish()隐藏在某个地方,所以被转换成IConnectableObservable<ListItem>. 没有呼叫连接,订阅永远不会“激活”。

于 2011-08-03T10:42:09.727 回答
5

First() returns a T, not an Observable<T>, so it must block.

A non-blocking form is xs.Take(1)

于 2011-08-03T14:47:20.963 回答