我目前正在努力尝试使用 rx 实现 tcp 看门狗/重试系统,非常感谢您的帮助。
有一个 Observable,我希望有一个 Observable,它会定期检查我们是否仍然可以写入套接字。很简单,我可以这样做:
class SocketSubscribeFunc implements Observable.OnSubscribeFunc<Socket> {
private final String hostname;
private final int port;
private Socket socket;
SocketSubscribeFunc(String hostname, int port) {
this.hostname = hostname;
this.port = port;
}
public Subscription onSubscribe(final Observer<? super Socket> observer) {
try {
log.debug("Trying to connect...");
socket = new Socket(hostname, port);
observer.onNext(socket);
} catch (IOException e) {
observer.onError(e);
}
return new Subscription() {
public void unsubscribe() {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
};
}
}
Observable<Socket> socketObservable = Observable.create(new SocketSubscribeFunc(hostname,port));
Observable<Boolean> watchdog = Observable.combineLatest(socketObservable, Observable.interval(1, TimeUnit.SECONDS), new Func2<Socket, Long, Boolean>() {
public Boolean call(final Socket socket, final Long aLong) {
try {
socket.getOutputStream().write("ping\n".getBytes());
return true;
} catch (IOException e) {
return false;
}
}
});
现在,如果可以获取套接字(服务器/链接在创建时关闭)或变得不可写(成功连接后服务器/链接无法访问),我想重试连接。理想情况下,通过重新订阅其 OnSubscribeFunc 使用重试运算符创建连接的套接字 Observable。如您所见,这会在套接字和看门狗 Observables 之间引入循环依赖。我玩弄了一段时间 switchMap/materialize... 为了传播最终的错误无济于事。
我接近放弃这个想法并使用副作用代码中的主题。但是在全球范围内应该有更好的方法:)
提前致谢!