0

使用项目 Reactor 3.0.4.RELEASE。从概念上讲,在 RxJava 中也应该是相同的。

public Mono<Map<String, Boolean>> refreshPods(List<String> apps) {
    return pods(apps)
            .filter(this::isRunningAndNotThisApp)
            .groupBy(Item::getName)
            .flatMap(g -> g
                    .distinct(Item::getIp)
                    .collectList()
                    // TODO: This doesn't seem to be working as expected
                    .subscribeOn(Schedulers.newParallel("par-grp"))
                    .flatMap(client::refreshPods))
            .flatMap(m -> Flux.fromIterable(m.entrySet()))
            .collectMap(Map.Entry::getKey, Map.Entry::getValue);
}

这个想法是client.refreshPods为每个组在单独的线程中运行。

编辑:我publishOn在发布这个问题之前和这里给出的答案之后尝试过,但输出没有改变。

客户:

public class MyServiceClientImpl implements MyServiceClient {
    private final RestOperations restOperations;
    private final ConfigRefreshProperties configRefreshProperties;

    public Mono<Map<String, Boolean>> refreshPods(List<Item> pods) {
        return Flux.fromIterable(pods)
                .zipWith(Flux.interval(Duration.ofSeconds(configRefreshProperties.getRefreshDelaySeconds())),
                        (x, delay) -> x)
                .flatMap(this::refreshWithRetry)
                .collectMap(Tuple2::getT1, Tuple2::getT2);
    }

    private Mono<Tuple2<String, Boolean>> refreshWithRetry(Item pod) {
        return Mono.<Boolean>create(emitter -> {
            try {
                log.info("Attempting to refresh pod: {}.", pod);
                ResponseEntity<String> tryRefresh = refresh(pod);

                if (!tryRefresh.getStatusCode().is2xxSuccessful()) {
                    log.error("Failed to refresh pod: {}.", pod);
                    emitter.success();
                } else {
                    log.info("Successfully refreshed pod: {}.", pod);
                    emitter.success(true);
                }
            } catch (Exception e) {
                emitter.error(e);
            }
        })
                .map(b -> Tuples.of(pod.getIp(), b))
                .log(getClass().getName(), Level.FINE)
                .retryWhen(errors -> {
                    int maxRetries = configRefreshProperties.getMaxRetries();
                    return errors.zipWith(Flux.range(1, maxRetries + 1), (ex, i) -> Tuples.of(ex, i))
                            .flatMap(t -> {
                                Integer retryCount = t.getT2();
                                if (retryCount <= maxRetries && shouldRetry(t.getT1())) {
                                    int retryDelaySeconds = configRefreshProperties.getRetryDelaySeconds();
                                    long delay = (long) Math.pow(retryDelaySeconds, retryCount);
                                    return Mono.delay(Duration.ofSeconds(delay));
                                }
                                log.error("Done retrying to refresh pod: {}.", pod);
                                return Mono.<Long>empty();
                            });
                });
    }

    private ResponseEntity<String> refresh(Item pod) {
        return restOperations.postForEntity(buildRefreshEndpoint(pod), null, String.class);
    }

    private String buildRefreshEndpoint(Item pod) {
        return UriComponentsBuilder.fromUriString("http://{podIp}:{containerPort}/refresh")
                .buildAndExpand(pod.getIp(), pod.getPort())
                .toUriString();
    }

    private boolean shouldRetry(Throwable t) {
        boolean clientError = ThrowableAnalyzer.getFirstOfType(t, HttpClientErrorException.class)
                .map(HttpClientErrorException::getStatusCode)
                .filter(s -> s.is4xxClientError())
                .isPresent();

        boolean timeoutError = ThrowableAnalyzer.getFirstOfType(t, TimeoutException.class)
                .isPresent();

        return timeoutError || !clientError;
    }
}

问题是每个组的日志语句Attempting to refresh pod都打印在同一个线程上。我在这里想念什么?

测试运行的日志

2017-02-07 10:g12:55.348  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Attempting to refresh pod: Item(name=news, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.357  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Successfully refreshed pod: Item(name=news, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.358  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Attempting to refresh pod: Item(name=parking, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.363  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Successfully refreshed pod: Item(name=parking, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.364  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Attempting to refresh pod: Item(name=localsearch, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.368  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Successfully refreshed pod: Item(name=localsearch, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.369  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Attempting to refresh pod: Item(name=auth, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.372  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Successfully refreshed pod: Item(name=auth, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.373  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Attempting to refresh pod: Item(name=log, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.377  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Successfully refreshed pod: Item(name=log, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.378  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Attempting to refresh pod: Item(name=fuel, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.381  INFO 33905 --- [        timer-1] c.n.d.cloud.config.MyServiceClientImpl  : Successfully refreshed pod: Item(name=fuel, ip=127.0.0.1, port=8888, podPhase=Running).
4

3 回答 3

1

编辑:由于您新提供的日志更加明确,并且正如大卫在您创建的问题中所提到的那样,根本原因是您在interval这里使用了一个。这会将上下文切换到默认值TimedScheduler(所有组都相同)。这就是为什么在调用之前所做的任何事情refreshPods似乎都被忽略了(工作在间隔线程上完成),但在间隔运算符之后的 publishOn/subscribeOn 应该工作。简而言之,我建议在静止subscribeOn后直接create使用。

您触发了一个阻塞行为 ( refresh(pod)),您将其包装为Monoin refreshWithRetry

除非您强烈需要在此级别上与并发无关,否则我建议您立即将subscribeOnnext 链接到create.

这样,使用它时就不足为奇了Mono:它尊重合同并且不会阻塞。像这样:

return Mono.<Boolean>create(emitter -> {
        //...
    })
.subscribeOn(Schedulers.newParallel("par-grp"))
.map(b -> Tuples.of(pod.getIp(), b))

如果您希望该方法返回与并发无关的发布者,那么您需要将更subscribeOn接近您的阻塞发布者,因此您需要扩展flatMaplambda:

.flatMap(pods -> client.refreshPods(pods)
                       .subscribeOn(Schedulers.newParallel("par-grp"))
)
于 2017-02-07T13:38:19.357 回答
0

在您publishOn之前放置的代码中flatMap。在使用异步源时,组合可观察对象的方法喜欢flatMapzip执行自己的重新调度。interval在您的情况下是这样的异步源。这就是为什么您在“计时器”线程上获得所有结果的原因。

1)publishOn 在您希望并行进行的操作之前使用。操作本身不应涉及重新调度。例如。map是好的,flatMap是坏的。

2)在它之后使用另一个publishOn来重新安排结果。否则订阅者的线程可能会干扰。

Flux.range(1, 100)
        .groupBy(i -> i % 5)
        .flatMap(group -> group
                .publishOn(Schedulers.newParallel("grp", 8))
                .map(v -> {
                    // processing here
                    String threadName = Thread.currentThread().getName();
                    logger.info("processing {} from {} on {}", v, group.key(), threadName);
                    return v;
                })
                .publishOn(Schedulers.single())
        )
        .subscribe(v -> logger.info("got {}", v));

如果您想确保所有组的项目在同一线程上运行,请参阅此答案:https ://stackoverflow.com/a/41697348/697313

于 2017-02-07T15:44:16.993 回答
0

为了完整起见,我自己发布了一个答案。在@simon-baslé 和@akarnokd 的帮助下,我做对了。以下两项工作。有关详细信息,请参见reactor-core#421

解决方案 1

zipWith(Flux.interval(Duration.ofSeconds(groupMemberDelaySeconds)),
    (x, delay) -> x)
.publishOn(Schedulers.newParallel("par-grp"))
.flatMap(this:: refreshWithRetry)

解决方案 2

zipWith(Flux.intervalMillis(1000 * groupMemberDelaySeconds, Schedulers.newTimer("par-grp")),
    (x, delay) -> x)
.flatMap(this:: refreshWithRetry)

方法中不需要subscribeOn或者publishOn是必需的refreshPods

于 2017-02-09T01:28:52.400 回答