简单的说
给定一个现有的 Observable(尚未完成),有没有办法检索关联的订阅者(传递给 subscribe 的函数)以使他们订阅另一个 Observable?
语境
我的应用程序中的一项服务有助于创建SeverEvent连接,将ConnectableObservable返回到代理连接并允许使用发布运算符进行多播。该服务通过内部存储跟踪现有连接:
store: {[key: string]: ConnectionTracker};
// …
interface ConnectionTracker {
url: string;
eventSource: EventSource;
observable: rx.ConnectableObservable<any>;
subscription: rx.Subscription;
observer: rx.Observer<any>;
data?: any; // Arbitrary data
}
在创建连接时,如果关联的跟踪器已经存在(使用连接的端点生成身份),服务应该:
- ok
关闭现有跟踪器的ServerEvent连接 - ok
打开一个新的SerevrEvent连接(因此是一个新的 ConnectableObservable) - 用新的 observable 替换现有跟踪器的 Observable,但现在让现有订阅者订阅新的 Observable
这是创建ConnectionTracker的代码部分
/**
* Create/Update a ServerEvent connection tracker
*/
createTracker<T>(endpoint: string, queryString: string = null): ConnectionTracker
{
let fullUri = endpoint + (queryString ? `?${queryString}` : '')
, tracker = this.findTrackerByEndpoint(endpoint) || {
observable: null,
fullUri: fullUri,
eventSource: null,
observer: null,
subscription: null
}
;
// Tracker exists
if (tracker.observable !== null) {
// If fullUri hasn't changed, use the tracker as is
if (tracker.fullUri === fullUri) {
return tracker;
}
// At this point, we know "fullUri" has changed, the tracker's
// connection should be replaced with a fresh one
// ⇒ TODO
// ⇒ Gather old tracker.observable's subscribers/subscriptions to make
// them subscribe to the new Observable instead (created down below)
// Terminate previous connection and clean related resouces
tracker.observer.complete();
tracker.eventSource.close();
}
tracker.eventSource = new EventSource(<any>fullUri, {withCredentials: true});
tracker.observable = rx.Observable.create((observer: rx.Observer<T>) => {
// Executed once
tracker.eventSource.onmessage = e => observer.next(JSON.parse(e.data));
tracker.eventSource.onerror = e => observer.error(e);
// Keep track of the observer
tracker.observer = observer;
})
// Transform Observable into a ConnectableObservable for multicast
.publish()
;
// Start emitting right away and also keep a reference to
// proxy subscription for later disposal
tracker.subscription = tracker.observable.connect();
return tracker;
}
谢谢你。