1

我正在尝试学习反应式编程,作为第一个“真正的”应用程序,我选择了一个简单的 IRC 客户端,带有RxJavaRxNetty

如果第一个服务器失败,我目前正在重试另一台服务器。所以,我有一个Observable<Server>,它由网络的服务器组成。它可以使用 设置.repeat(),因此它无限期地重复服务器。RxClient::connect现在,我如何使它成为一个阻塞的,以便一次只使用一个,并且只有在失败或连接超时时才会转到下一个?

虽然其他解决方案是使用域,每次我连接并使用时都会将我重定向到不同的服务器.retry(),但我有兴趣以反应方式解决问题。

连接到所有服务器(不带.repeat())工作正常,但这不是我想要的:

servers
    .map(x -> RxNetty.createTcpClient(
        x.getAddress(),
        x.getPort(),
        PipelineConfigurators.stringMessageConfigurator()))
    .flatMap(RxClient::connect)
    .onErrorFlatMap(x -> Observable.empty())
    .subscribe(this::handleConnection);
4

1 回答 1

2

如何使用concatMap而不是flatMap. RxClient::connect因此,只有在完成或错误时才会使用下一个服务器:

servers
    .map(x -> RxNetty.createTcpClient(
        x.getAddress(),
        x.getPort(),
        PipelineConfigurators.stringMessageConfigurator()))
    .concatMap(RxClient::connect)
    .onErrorFlatMap(x -> Observable.empty())
    .subscribe(this::handleConnection);
于 2015-09-12T10:04:19.103 回答