我在可观察的递归链上遇到了一些麻烦。
我正在使用 RxJS,它目前的版本为 1.0.10621,包含最基本的 Rx 功能,以及用于 jQuery 的 Rx。
让我为我的问题介绍一个示例场景:我正在轮询Twitter 搜索 API(JSON 响应)以获取包含某个关键字的推文/更新。响应还包括一个“refresh_url”,应该使用它来生成后续请求。对该后续请求的响应将再次包含新的 refresh_url 等。
Rx.jQuery 允许我将 Twitter 搜索 API 调用变成一个可观察的事件,该事件会产生一个 onNext 然后完成。到目前为止,我尝试的是让 onNext 处理程序记住 refresh_url 并在 onCompleted 处理程序中使用它来为下一个请求生成一个新的 observable 和相应的观察者。这样,一对可观察者 + 观察者对无限期地跟随另一对。
这种方法的问题是:
后续的 observable/observer 在其前辈还没有被处理掉的时候就已经存活了。
我必须做很多令人讨厌的簿记才能保持对当前活着的观察者的有效引用,其中实际上可能有两个。(一个在 onCompleted 中,另一个在其生命周期的其他地方)当然,需要取消订阅/处置观察者。簿记的替代方法是通过“仍在运行?”布尔值来实现副作用,正如我在示例中所做的那样。
示例代码:
running = true;
twitterUrl = "http://search.twitter.com/search.json";
twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
twitterMaxId = 0; //actually twitter ignores its since_id parameter
newTweetObserver = function () {
return Rx.Observer.create(
function (tweet) {
if (tweet.id > twitterMaxId) {
twitterMaxId = tweet.id;
displayTweet(tweet);
}
}
);
}
createTwitterObserver = function() {
twitterObserver = Rx.Observer.create(
function (response) {
if (response.textStatus == "success") {
var data = response.data;
if (data.error == undefined) {
twitterQuery = data.refresh_url;
var tweetObservable;
tweetObservable = Rx.Observable.fromArray(data.results.reverse());
tweetObservable.subscribe(newTweetObserver());
}
}
},
function(error) { alert(error); },
function () {
//create and listen to new observer that includes a delay
if (running) {
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000);
twitterObservable.subscribe(createTwitterObserver());
}
}
);
return twitterObserver;
}
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery);
twitterObservable.subscribe(createTwitterObserver());
不要被从请求到推文的双层 observables/observers 所迷惑。我的示例主要关注第一层:从 Twitter 请求数据。如果在解决这个问题时,第二层(将响应转换为推文)可以与第一层合二为一,那就太棒了;但我认为那是完全不同的事情。目前。
Erik Meijer 向我指出了 Expand 运算符(参见下面的示例),并建议将Join 模式作为替代方案。
var ys = Observable.Expand
(new[]{0}.ToObservable() // initial sequence
, i => ( i == 10 ? Observable.Empty<int>() // terminate
: new[]{i+1}.ToObservable() // recurse
)
);
ys.ToArray().Select(a => string.Join(",", a)).DumpLive();
这应该可以复制粘贴到 LINQPad 中。它假设单例 observables 并产生一个最终的观察者。
所以我的问题是:如何在 RxJS 中做最好的扩展技巧?
编辑:扩展运算符可能可以按照此线程
中所示的方式实现。但是需要生成器(我只有 JS < 1.6)。
不幸的是,RxJS 2.0.20304-beta没有实现 Extend 方法。