2

情况:
我遇到了一个 rxjsObservable系统的用例,我可能需要在它启动后将piped 命令添加到 a中。Subscription

就我而言,我正在处理的应用程序必须被动地收听推送通知系统。可以通过此系统推送许多消息,我的系统需要对其进行响应。 但是,有一种可预见的情况,即未来将要实现的动态加载视图将需要向推送通知系统添加监听器。

问题:
鉴于我的应用程序处于我已经存在的状态,我可以在调用Subscription后添加一个额外的管道吗?.subscribe(() => {})

// this.something is an Observable<any>, for discussion purposes.
const subscription = this.something.subscribe(() => { // commands });

this.something.pipe(
  map((something) => {
    // ...Commands that I want to add to the subscription...
  })
);

...如果我这样做,那么会发生什么,如果有的话?

解决方案:
@user2216584 和@SerejaBogolubov 的两个答案都包含这个问题的答案。

我的高级推送通知侦听器服务需要做两件事:

  1. 继续订阅,然后
  2. 能够从听众列表中提取。

复杂之处在于每个侦​​听器都需要侦听不同的消息。换句话说,如果我在 上收到消息foo_DEV,应用程序需要做的事情与推送通知系统在 上推送消息时不同bar_DEV

所以,这就是我想出的:

export interface PushNotificationListener {
  name: string,
  onMessageReceived: (msg: PushNotificationMessage) => any,
  messageSubject$: Subject<PushNotificationMessage>
}

export class PushNotificationListenerService {
  private connection$: Observable<PushNotificationConnection>;
  private subscription$: Subscription;

  private listeners: PushNotificationListener[] = [];

  constructor(
    private connectionManager: PushNotificationConnectionManager
  ) {
  }

  connect() {
    // Step 1 - Open the socket connection!
    this.connection$ = this.connectionManager.connect(
      // The arguments for setting up the websocket are unimportant here.
      // The underlying implementation is similarly unimportant.
    );
  } 

  setListener(
    name: string,
    onMessageReceived: (msg: PushNotificationMessage) => any
  ) {
    // Step 3...or maybe 2...(shrug)...
    // Set listeners that the subscription to the high-order connection
    // will employ.
    const newListener: PushNotificationListener = {
      name: name,
      onMessageReceived: onMessageReceived,
      messageSubject$: null
    };

    this.listeners.push(newListener);
  }

  listen() {
    // Step 2 - Listen for changes to the high-order connection observable.
    this.subscription$ = this.connection$
      .subscribe((connection: PushNotificationConnection) => {
        console.info('Push notification connection established');

        for (let listener of this.listeners) {
         listener.messageSubject$ = connection.subscribe(listener.name);
         listener.messageSubject$.subscribe((message: PushNotificationMessage) => {
           listener.onMessageReceived(message);
         }
        }
      },
      (error: any) => {
        console.warn('Push notification connection error', error);
      }
  }
}

通过仔细研究构成我的推送通知系统核心的内部代码,我发现我们已经有了一个高阶Observable. websocket 代码创建了一个 observable ( connectionManager.connect()),它需要被缓存在服务中并被订阅。由于该代码特定于我工作的地方,我不能再多说了。

但是,缓存侦听器也很重要!每当连接更改状态时,subscribe调用.listen()都会遍历所有附加的侦听器,因此我可以通过 临时添加侦听器.addListener(),并且由于 rxjs 的Observable系统本质上是如何工作的以及我正在从范围内列表工作的事实的监听器,我有一个系统,我可以动态设置监听器,即使.connect()在配置任何监听器之前被调用。

这段代码可能仍然可以从重新设计/重构中受益,但我有一些有用的东西,这是任何良好编码的重要第一步。谢谢你们!

4

2 回答 2

3

[我正在编辑我的答案,因为之前的答案是根据作者分享的第一个代码;如评论中所述,作者已更改/更正了代码] -

我怀疑以下代码会影响订阅中的任何内容-

this.something.pipe(
  map((something) => {
    // ...Commands that I want to add to the subscription...
  })
);

您可以在最初设置 observable 时尝试更高阶函数,如果更高阶函数在范围内,您可以重新分配它。我也怀疑它是否会起作用,原因如下:

  1. 设置 Observable 后,observable 会保留传递的函数的引用,该函数将在订阅时调用 [ https://medium.com/@benlesh/learning-observable-by-building-observable-d5da57405d87]。现在,如果您重新分配高阶函数,则可观察函数仍指向旧引用。通过重新分配高阶函数,您并没有更改最初设置 observable 时设置的原始函数引用。

  2. 假设由于某种原因,高阶重新分配有效,在这种情况下,也很有可能在执行旧的高阶函数之前,您可能已经重新分配了高阶函数(因为如果源 observable 对后端进行异步调用,而等待代码,javascript 事件循环可能已经重新分配了高阶函数,当异步调用返回时,它将执行新分配的高阶函数)。也许这段代码会澄清我的观点-

让 highOrderFunc = map(x => x * 2);

this.something
    .pipe(
          mergeMap(_ => //call to backend; async call),
          higherOrderFunc,
         ).subscribe();
higherOrderFunc = map(x => x * 3); // this will execute before async call completes
于 2019-06-07T19:43:34.533 回答
2

好吧,你可以很容易地做到这一点。比如说,你想要一些运行时延迟的map. 比您执行类似的操作map(this.myMapper)myMapper在适当范围内可见的私有字段在哪里。通过改变该私有字段,您可以添加/删除其他行为。例如,map(x => x)这意味着没有任何映射。

但是,在我看来,您正在滥用rxjs. 很可能您真正需要的是正确的高阶可观察对象(发出可观察对象的可观察对象,“流的流”)。那将是更rxjs简洁和更清洁的解决方案。所以三思而后行。

于 2019-06-07T19:57:10.773 回答