使用项目 Reactor 3.0.4.RELEASE。从概念上讲,在 RxJava 中也应该是相同的。
public Mono<Map<String, Boolean>> refreshPods(List<String> apps) {
return pods(apps)
.flatMap(g -> g
// TODO: This doesn't seem to be working as expected
.flatMap(m -> Flux.fromIterable(m.entrySet()))
.collectMap(Map.Entry::getKey, Map.Entry::getValue);
public class MyServiceClientImpl implements MyServiceClient {
private final RestOperations restOperations;
private final ConfigRefreshProperties configRefreshProperties;
public Mono<Map<String, Boolean>> refreshPods(List<Item> pods) {
return Flux.fromIterable(pods)
(x, delay) -> x)
.collectMap(Tuple2::getT1, Tuple2::getT2);
private Mono<Tuple2<String, Boolean>> refreshWithRetry(Item pod) {
return Mono.<Boolean>create(emitter -> {
try {
log.info("Attempting to refresh pod: {}.", pod);
ResponseEntity<String> tryRefresh = refresh(pod);
if (!tryRefresh.getStatusCode().is2xxSuccessful()) {
log.error("Failed to refresh pod: {}.", pod);
} else {
log.info("Successfully refreshed pod: {}.", pod);
} catch (Exception e) {
.map(b -> Tuples.of(pod.getIp(), b))
.log(getClass().getName(), Level.FINE)
.retryWhen(errors -> {
int maxRetries = configRefreshProperties.getMaxRetries();
return errors.zipWith(Flux.range(1, maxRetries + 1), (ex, i) -> Tuples.of(ex, i))
.flatMap(t -> {
Integer retryCount = t.getT2();
if (retryCount <= maxRetries && shouldRetry(t.getT1())) {
int retryDelaySeconds = configRefreshProperties.getRetryDelaySeconds();
long delay = (long) Math.pow(retryDelaySeconds, retryCount);
return Mono.delay(Duration.ofSeconds(delay));
log.error("Done retrying to refresh pod: {}.", pod);
return Mono.<Long>empty();
private ResponseEntity<String> refresh(Item pod) {
return restOperations.postForEntity(buildRefreshEndpoint(pod), null, String.class);
private String buildRefreshEndpoint(Item pod) {
return UriComponentsBuilder.fromUriString("http://{podIp}:{containerPort}/refresh")
.buildAndExpand(pod.getIp(), pod.getPort())
private boolean shouldRetry(Throwable t) {
boolean clientError = ThrowableAnalyzer.getFirstOfType(t, HttpClientErrorException.class)
.filter(s -> s.is4xxClientError())
boolean timeoutError = ThrowableAnalyzer.getFirstOfType(t, TimeoutException.class)
return timeoutError || !clientError;
问题是每个组的日志语句Attempting to refresh pod
2017-02-07 10:g12:55.348 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=news, ip=, port=8888, podPhase=Running).
2017-02-07 10:12:55.357 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=news, ip=, port=8888, podPhase=Running).
2017-02-07 10:12:55.358 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=parking, ip=, port=8888, podPhase=Running).
2017-02-07 10:12:55.363 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=parking, ip=, port=8888, podPhase=Running).
2017-02-07 10:12:55.364 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=localsearch, ip=, port=8888, podPhase=Running).
2017-02-07 10:12:55.368 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=localsearch, ip=, port=8888, podPhase=Running).
2017-02-07 10:12:55.369 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=auth, ip=, port=8888, podPhase=Running).
2017-02-07 10:12:55.372 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=auth, ip=, port=8888, podPhase=Running).
2017-02-07 10:12:55.373 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=log, ip=, port=8888, podPhase=Running).
2017-02-07 10:12:55.377 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=log, ip=, port=8888, podPhase=Running).
2017-02-07 10:12:55.378 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=fuel, ip=, port=8888, podPhase=Running).
2017-02-07 10:12:55.381 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=fuel, ip=, port=8888, podPhase=Running).