我正在为 Stomp over Websockets 构建一个小 RxJS 包装器,它已经可以工作了。
但是现在我有了一个非常酷的功能的想法,使用 RxJS 可以很容易地完成这个功能(希望 - 如果我错了,请纠正我)。
当前行为:
myStompWrapper.configure("/stomp_endpoint");
myStompWrapper.connect(); // onSuccess: set state to CONNECTED
// state (Observable) can be DISCONNECTED or CONNECTED
var subscription = myStompWrapper.getState()
.filter(state => state == "CONNECTED")
.flatMap(myStompWrapper.subscribeDestination("/foo"))
.subscribe(msg => console.log(msg));
// ... and some time later:
subscription.unsubscribe(); // calls 'unsubscribe' for this stomp destination
myStompWrapper.disconnect(); // disconnects the stomp websocket connection
如您所见,我必须等待state == "CONNECTED"
才能订阅subscribeDestination(..)
。否则我会从 Stomp 库中得到一个错误。
新行为:
下一个实现应该使用户更容易。这是我的想象:
myStompWrapper.configure("/stomp_endpoint");
var subscription = myStompWrapper.subscribeDestination("/foo")
.subscribe(msg => console.log(msg));
// ... and some time later:
subscription.unsubscribe();
它应该如何在内部工作:
configure
只能调用 whileDISCONNECTED
- 当
subscribeDestination
被调用时,有两种可能性:- if
CONNECTED
: 只订阅目的地 - if
DISCONNECTED
:首先调用connect()
,然后订阅目的地
- if
- 当
unsubscribe
被调用时,有两种可能性:- 如果这是最后一次订阅:调用
disconnect()
- 如果这不是最后一次订阅:什么都不做
- 如果这是最后一次订阅:调用
我还不确定如何到达那里,但这就是我在这里问这个问题的原因;-)
提前致谢!
编辑:更多代码、示例和解释
当configure()在未断开连接的情况下被调用时,它应该抛出一个Error
. 但这没什么大不了的。
stompClient.connect(..)是非阻塞的。它有一个onSuccess
回调:
public connect() {
stompClient.connect({}, this.onSuccess, this.errorHandler);
}
public onSuccess = () => {
this.state.next(State.CONNECTED);
}
observeDestination(..)订阅一个 Stomp 消息通道(= 目的地)并返回一个Rx.Observable
然后可以用来取消订阅这个 Stomp 消息通道:
public observeDestination(destination: string) {
return this.state
.filter(state => state == State.CONNECTED)
.flatMap(_ => Rx.Observable.create(observer => {
let stompSubscription = this.client.subscribe(
destination,
message => observer.next(message),
{}
);
return () => {
stompSubscription.unsubscribe();
}
}));
}
它可以这样使用:
myStompWrapper.configure("/stomp_endpoint");
myStompWrapper.connect();
myStompWrapper.observeDestination("/foo")
.subscribe(..);
myStompWrapper.observeDestination("/bar")
.subscribe(..);
现在我想摆脱myStompWrapper.connect()
. 该代码应this.connect()
在第一个通过调用订阅时自动调用,observeDestination(..).subscribe(..)
并this.disconnect()
在最后一个调用时调用unsubscribe()
。
例子:
myStompWrapper.configure("/stomp_endpoint");
let subscription1 = myStompWrapper.observeDestination("/foo")
.subscribe(..); // execute connect(), because this
// is the first subscription
let subscription2 = myStompWrapper.observeDestination("/bar")
.subscribe(..);
subscription2.unsubscribe();
subscription1.unsubscribe(); // execute disconnect(), because this
// was the last subscription