1

我正在为 Stomp over Websockets 构建一个小 RxJS 包装器,它已经可以工作了。

但是现在我有了一个非常酷的功能的想法,使用 RxJS 可以很容易地完成这个功能(希望 - 如果我错了,请纠正我)。

当前行为:

myStompWrapper.configure("/stomp_endpoint");
myStompWrapper.connect();      // onSuccess: set state to CONNECTED

// state (Observable) can be DISCONNECTED or CONNECTED
var subscription = myStompWrapper.getState()
    .filter(state => state == "CONNECTED")
    .flatMap(myStompWrapper.subscribeDestination("/foo"))
    .subscribe(msg => console.log(msg));

// ... and some time later:
subscription.unsubscribe();    // calls 'unsubscribe' for this stomp destination
myStompWrapper.disconnect();   // disconnects the stomp websocket connection

如您所见,我必须等待state == "CONNECTED"才能订阅subscribeDestination(..)。否则我会从 Stomp 库中得到一个错误。

新行为:

下一个实现应该使用户更容易。这是我的想象:

myStompWrapper.configure("/stomp_endpoint");

var subscription = myStompWrapper.subscribeDestination("/foo")
    .subscribe(msg => console.log(msg));

// ... and some time later:
subscription.unsubscribe();

它应该如何在内部工作:

  1. configure只能调用 whileDISCONNECTED
  2. subscribeDestination被调用时,有两种可能性:
    1. if CONNECTED: 只订阅目的地
    2. if DISCONNECTED:首先调用connect(),然后订阅目的地
  3. unsubscribe被调用时,有两种可能性:
    1. 如果这是最后一次订阅:调用disconnect()
    2. 如果这不是最后一次订阅:什么都不做

我还不确定如何到达那里,但这就是我在这里问这个问题的原因;-)

提前致谢!

编辑:更多代码、示例和解释

configure()在断开连接的情况下被调用时,它应该抛出一个Error. 但这没什么大不了的。

stompClient.connect(..)是非阻塞的。它有一个onSuccess回调:

public connect() {
  stompClient.connect({}, this.onSuccess, this.errorHandler);
}

public onSuccess = () => {
  this.state.next(State.CONNECTED);
}

observeDestination(..)订阅一个 Stomp 消息通道(= 目的地)并返回一个Rx.Observable然后可以用来取消订阅这个 Stomp 消息通道:

public observeDestination(destination: string) {
  return this.state
      .filter(state => state == State.CONNECTED)
      .flatMap(_ => Rx.Observable.create(observer => {
        let stompSubscription = this.client.subscribe(
            destination,
            message => observer.next(message),
            {}
        );

        return () => {
          stompSubscription.unsubscribe();
        }
      }));
}

它可以这样使用:

myStompWrapper.configure("/stomp_endpoint");
myStompWrapper.connect();

myStompWrapper.observeDestination("/foo")
    .subscribe(..);

myStompWrapper.observeDestination("/bar")
    .subscribe(..);

现在我想摆脱myStompWrapper.connect(). 该代码应this.connect()在第一个通过调用订阅时自动调用,observeDestination(..).subscribe(..)this.disconnect()在最后一个调用时调用unsubscribe()

例子:

myStompWrapper.configure("/stomp_endpoint");

let subscription1 = myStompWrapper.observeDestination("/foo")
    .subscribe(..); // execute connect(), because this
                    // is the first subscription

let subscription2 = myStompWrapper.observeDestination("/bar")
    .subscribe(..);

subscription2.unsubscribe();
subscription1.unsubscribe(); // execute disconnect(), because this 
                             // was the last subscription
4

1 回答 1

0

RxJS:使用 Websockets 和 Stomp 自动(断开)连接(取消)订阅

我同意你建议隐藏在 myStompWrapper 中的代码在它的新家中会更快乐。

我仍然建议使用类似的名称,observeDestination而不是subscribeDestination("/foo")因为您实际上并没有订阅该方法,而只是完成了您的可观察链。

  1. configure()只能调用 whileDISCONNECTED

    您没有在此处指定如果在 not 时调用它会发生什么DISCONNECTED。由于您似乎没有在这里返回您将使用的任何值,因此我假设您打算在异常状态不方便时抛出异常。为了跟踪这些状态,我会使用以BehaviourSubject. 开头的初始值DISCONNECTED。您可能希望保持状态observeDestination以决定是否抛出异常

  2. 如果已连接:只需订阅目的地

    if DISCONNECTED:首先调用connect(),然后订阅目的地

    正如我之前提到的,我认为如果订阅不在内部发生,subscribeDestination("/foo")而是你只是建立你的可观察链,你会更快乐。由于您只是想connect()在某些情况下调用,我会简单地.do()在您的可观察链中使用包含状态条件的调用。

  3. 要使用 rx-y 逻辑,您可能希望调用disconnect()作为 observable unsubscribe 的一部分,并简单地返回一个共享的 refcounted observable 开始。这样,每个新订阅者都不会重新创建新订阅,而是.refCount()将对可观察链进行一次订阅,并且unsubscribe()一旦下游不再有订阅者。

假设消息以this.observedData$的形式出现在myStompWrapper我的建议代码中,则myStompWrapper看起来像这样:

observeDestination() {
  return Rx.Observable.create(function (observer) {
     var subscription = this.getState()
             .filter(state => state == "CONNECTED")
             .do(state => state ? this.connect() : Observable.of(true))
             .switchMap(this.observedData$)
             .refCount();
             .subscribe(value => {
               try {
                 subscriber.next(someCallback(value));
               } catch(err) {
                 subscriber.error(err);
               }
             },
             err => subscriber.error(err),
             () => subscriber.complete());

 return { unsubscribe() { this.disconnect(); subscription.unsubscribe(); } };
}

因为我遗漏了您的一些代码,所以我允许自己不测试我的代码。但希望它能说明并呈现我在回答中提到的概念。

于 2017-03-05T00:27:06.380 回答