情况:
我遇到了一个 rxjsObservable
系统的用例,我可能需要在它启动后将pipe
d 命令添加到 a中。Subscription
就我而言,我正在处理的应用程序必须被动地收听推送通知系统。可以通过此系统推送许多消息,我的系统需要对其进行响应。 但是,有一种可预见的情况,即未来将要实现的动态加载视图将需要向推送通知系统添加监听器。
问题:
鉴于我的应用程序处于我已经存在的状态,我可以在调用Subscription
后添加一个额外的管道吗?.subscribe(() => {})
// this.something is an Observable<any>, for discussion purposes.
const subscription = this.something.subscribe(() => { // commands });
this.something.pipe(
map((something) => {
// ...Commands that I want to add to the subscription...
})
);
...如果我这样做,那么会发生什么,如果有的话?
解决方案:
@user2216584 和@SerejaBogolubov 的两个答案都包含这个问题的答案。
我的高级推送通知侦听器服务需要做两件事:
- 继续订阅,然后
- 能够从听众列表中提取。
复杂之处在于每个侦听器都需要侦听不同的消息。换句话说,如果我在 上收到消息foo_DEV
,应用程序需要做的事情与推送通知系统在 上推送消息时不同bar_DEV
。
所以,这就是我想出的:
export interface PushNotificationListener {
name: string,
onMessageReceived: (msg: PushNotificationMessage) => any,
messageSubject$: Subject<PushNotificationMessage>
}
export class PushNotificationListenerService {
private connection$: Observable<PushNotificationConnection>;
private subscription$: Subscription;
private listeners: PushNotificationListener[] = [];
constructor(
private connectionManager: PushNotificationConnectionManager
) {
}
connect() {
// Step 1 - Open the socket connection!
this.connection$ = this.connectionManager.connect(
// The arguments for setting up the websocket are unimportant here.
// The underlying implementation is similarly unimportant.
);
}
setListener(
name: string,
onMessageReceived: (msg: PushNotificationMessage) => any
) {
// Step 3...or maybe 2...(shrug)...
// Set listeners that the subscription to the high-order connection
// will employ.
const newListener: PushNotificationListener = {
name: name,
onMessageReceived: onMessageReceived,
messageSubject$: null
};
this.listeners.push(newListener);
}
listen() {
// Step 2 - Listen for changes to the high-order connection observable.
this.subscription$ = this.connection$
.subscribe((connection: PushNotificationConnection) => {
console.info('Push notification connection established');
for (let listener of this.listeners) {
listener.messageSubject$ = connection.subscribe(listener.name);
listener.messageSubject$.subscribe((message: PushNotificationMessage) => {
listener.onMessageReceived(message);
}
}
},
(error: any) => {
console.warn('Push notification connection error', error);
}
}
}
通过仔细研究构成我的推送通知系统核心的内部代码,我发现我们已经有了一个高阶Observable
. websocket 代码创建了一个 observable ( connectionManager.connect()
),它需要被缓存在服务中并被订阅。由于该代码特定于我工作的地方,我不能再多说了。
但是,缓存侦听器也很重要!每当连接更改状态时,subscribe
调用.listen()
都会遍历所有附加的侦听器,因此我可以通过 临时添加侦听器.addListener()
,并且由于 rxjs 的Observable
系统本质上是如何工作的,以及我正在从范围内列表工作的事实的监听器,我有一个系统,我可以动态设置监听器,即使.connect()
在配置任何监听器之前被调用。
这段代码可能仍然可以从重新设计/重构中受益,但我有一些有用的东西,这是任何良好编码的重要第一步。谢谢你们!