4

我需要访问一个 websocket 服务,它会在 24 小时后关闭一个打开的 websocket 连接。如何使用 Spring-Boot 2 和 Webflux 实现重新连接?

这是我到目前为止所拥有的(取自https://github.com/artembilan/webflux-websocket-demo):

@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getStreaming() throws URISyntaxException {
    ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();

    EmitterProcessor<String> output = EmitterProcessor.create();

    Mono<Void> sessionMono = client.execute(new URI("ws://localhost:8080/echo"),

    session -> session.receive()
    .timeout(Duration.ofSeconds(3))
    .map(WebSocketMessage::getPayloadAsText)
    .subscribeWith(output)
    .then());

    return output.doOnSubscribe(s -> sessionMono.subscribe());
}

一旦连接丢失(不再输入 3 秒),就会抛出 TimeoutException。但是如何重新连接套接字?

4

2 回答 2

5

没有开箱即用的解决方案,重新连接机制不是 JSR 356 - Java API for WebSocket 的一部分。但是您可以自己实现它 - 一个带有 Spring 事件的简单示例:

第 1 步 - 创建一个事件类。

public class ReconnectionEvent extends ApplicationEvent {

    private String url;

    public ReconnectionEvent(String url) {
        super(url);
        this.url = url;
    }

    public String getUrl() {
        return url;
    }

}

第 2 步 - 提供 websocket 连接的方法。一个例子:

...
@Autowired
private ApplicationEventPublisher publisher;
...
public void connect(String url) {

    ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();

    EmitterProcessor<String> output = EmitterProcessor.create();

    Mono<Void> sessionMono = client.execute(URI.create(url),

            session -> session.receive()
                    .map(WebSocketMessage::getPayloadAsText)
                    .log()
                    .subscribeWith(output)
                    .doOnTerminate(() -> publisher.publishEvent(new ReconnectEvent(url)))
                    .then());

    output
            .doOnSubscribe(s -> sessionMono.subscribe())
            .subscribe();
}

CheckdoOnTerminate()方法 - 当 Flux 终止时,无论是成功完成还是出错,它都会发出 ReconnectEvent。如有必要,您可以在其他 Flux 的回调上发出重新连接事件(例如仅 on doOnError())。

第 3 步 - 提供一个侦听器,当发生重新连接事件时,它会在给定的 url 上再次连接。

@EventListener(ReconnectEvent.class)
public void onApplicationEvent(ReconnectEvent event) {
    connect(event.getUrl());
}
于 2018-03-22T13:38:41.517 回答
1

我通过使用UnicastProcessor反应堆做了一些事情。

...
public abstract class AbstractWsReconnectClient {
    private Logger ...

    protected UnicastProcessor<AbstractWsReconnectClient> reconnectProcessor = UnicastProcessor.create();

    protected AbstractWsReconnectClient(Duration reconnectDuration) {
        reconnect(reconnectDuration);
    }

    public abstract Mono<Void> connect();

    private void reconnect(Duration duration) {
        reconnectProcessor.publish()
            .autoConnect()
            .delayElements(duration)
            .flatMap(AbstractWsReconnectClient::connect)
            .onErrorContinue(throwable -> true,
                (throwable, o) -> {
                    if (throwable instanceof ConnectException) {
                        logger.warn(throwable.getMessage());
                    } else {
                        logger.error("unexpected error occur during websocket reconnect");
                        logger.error(throwable.getMessage());
                    }
                })
            .doOnTerminate(() -> logger.error("websocket reconnect processor terminate "))
            .subscribe();
    }
}

WebSocketClientis 终止时,调用UnicastProcessor.onNext

public Mono<Void> connect() {
    WebSocketClient client = new ReactorNettyWebSocketClient();
    logger.info("trying to connect to sso server {}", uri.toString());
    return client.execute(uri, headers, ssoClientHandler)
        .doOnTerminate(() -> {
            logger.warn("sso server {} disconnect", uri.toString());
            super.reconnectProcessor.onNext(this);
        });
}
于 2018-12-19T08:22:09.003 回答