11

我想使用 Micrometer 记录异步方法最终发生时的执行时间。有推荐的方法吗?

示例:Kafka 回复模板。我想记录实际执行 sendAndReceive 调用所需的时间(发送关于请求主题的消息并接收关于回复主题的响应)。

    public Mono<String> sendRequest(Mono<String> request) {
        return request
            .map(r -> new ProducerRecord<String, String>(requestsTopic, r))
            .map(pr -> {
                pr.headers()
                        .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
                                "reply-topic".getBytes()));
                return pr;
            })
            .map(pr -> replyingKafkaTemplate.sendAndReceive(pr))
            ... // further maps, filters, etc.

就像是

responseGenerationTimer.record(() -> replyingKafkaTemplate.sendAndReceive(pr)))

不会在这里工作;它只记录创建 的时间Supplier,而不是实际的执行时间。

4

7 回答 7

3

您可以只使用 Mono/Flux() 中的 metrics() (在此处查看 metrics():https ://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html )然后您可以做类似的事情

public Mono<String> sendRequest(Mono<String> request) {
    return request
        .map(r -> new ProducerRecord<String, String>(requestsTopic, r))
        .map(pr -> {
            pr.headers()
                    .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
                            "reply-topic".getBytes()));
            return pr;
        })
        .map(pr -> replyingKafkaTemplate.sendAndReceive(pr)).name("my-metricsname").metrics()

例如,在石墨中,您将看到测量此调用的延迟(您可以在此处查看更多信息:如何将 Micrometer 计时器与 webflux 端点一起使用

于 2019-05-10T11:04:49.187 回答
2

你可以使用reactor.util.context.Context

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static org.hamcrest.Matchers.is;

public class TestMonoTimer {
    private static final Logger LOG = LoggerFactory.getLogger(TestMonoTimer.class);

    private static final String TIMER_SAMPLE = "TIMER_SAMPLE";
    private static final Timer TIMER = new SimpleMeterRegistry().timer("test");
    private static final AtomicBoolean EXECUTION_FLAG = new AtomicBoolean();

    @Test
    public void testMonoTimer() {
        Mono.fromCallable(() -> {
            Thread.sleep(1234);
            return true;
        }).transform(timerTransformer(TIMER))
                .subscribeOn(Schedulers.parallel())
                .subscribe(EXECUTION_FLAG::set);

        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAtomic(EXECUTION_FLAG, is(true));
        Assert.assertTrue(TIMER.totalTime(TimeUnit.SECONDS) > 1);
    }

    private static <T> Function<Mono<T>, Publisher<T>> timerTransformer(Timer timer) {
        return mono -> mono
                .flatMap(t -> Mono.subscriberContext()
                        .flatMap(context -> Mono.just(context.<Timer.Sample>get(TIMER_SAMPLE).stop(timer))
                                .doOnNext(duration -> LOG.info("Execution time is [{}] seconds",
                                        duration / 1000000000D))
                                .map(ignored -> t)))
                .subscriberContext(context -> context.put(TIMER_SAMPLE, Timer.start(Clock.SYSTEM)));
    }
}
于 2018-08-23T11:40:06.777 回答
1

您可以执行以下操作:

// Mono<Something> mono = ...
Timer.Sample sample = Timer.start(Clock.SYSTEM); // or use clock of registry
return mono.doOnNext(x -> sample.stop(timer));

有关示例文档,请参见此处:http: //micrometer.io/docs/concepts#_storing_start_state_in_code_timer_sample_code

对于更好的方法,您还可以查看resilience4j,他们通过变换装饰单声道:https ://github.com/resilience4j/resilience4j/tree/master/resilience4j-reactor

于 2018-06-23T18:28:05.333 回答
0

我使用了以下内容:

private <T> Publisher<T> time(String metricName, Flux<T> publisher) {
  return Flux.defer(() -> {

  long before = System.currentTimeMillis();
  return publisher.doOnNext(next -> Metrics.timer(metricName)
        .record(System.currentTimeMillis() - before, TimeUnit.MILLISECONDS));
  });
}

所以在实践中使用它:

Flux.just(someValue)
  .flatMap(val -> time("myMetricName", aTaskThatNeedsTimed(val))
  .subscribe(val -> {})
于 2018-10-24T10:41:45.347 回答
0

您可以使用metrics()计算时间间隔 b/wsubscribe()和的方法onComplete()。你可以这样做,

 .metrics().elapsed().doOnNext(tuple -> log.info("get response time: " + tuple.getT1() + "ms")).map(Tuple2::getT2);
于 2020-05-21T07:08:37.787 回答
0

如果您考虑使用metrics(),请理解即使您调用它也不会创建新的仪表Mono.name()

根据您的情况,您有三种选择。

  1. 使用metrics()
    • 好吧,如果您考虑使用 use metrics(),请理解即使您调用它也不会创建新的 Meter Mono.name()
  2. 记录时间doOnNext并计算时间。
  3. 使用Alexander Pankin强加的 subscriptionContext

就个人而言,我想使用方法3

于 2020-06-04T05:49:55.387 回答
-1

看起来就像recordCallableBrian Clozel 建议的那样是答案。我写了一个快速测试来验证这一点:

import io.micrometer.core.instrument.Timer;
import reactor.core.publisher.Mono;

public class Capitalizer {

    private final Timer timer;

    public Capitalizer(Timer timer) {
        this.timer = timer;
    }

    public Mono<String> capitalize(Mono<String> val) {
        return val.flatMap(v -> {
            try {
                return timer.recordCallable(() -> toUpperCase(v));
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
        }).filter(r -> r != null);
    }

    private Mono<String> toUpperCase(String val) throws InterruptedException {
        Thread.sleep(1000);
        return Mono.just(val.toUpperCase());
    }
}

并对此进行测试:

import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.util.concurrent.TimeUnit;

import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;

public class CapitalizerTest {

    private static final Logger logger =
        LoggerFactory.getLogger(CapitalizerTest.class);

    private Capitalizer capitalizer;
    private Timer timer;

    @Before
    public void setUp() {
        timer = new SimpleMeterRegistry().timer("test");
        capitalizer = new Capitalizer(timer);
    }

    @Test
    public void testCapitalize() {
        String val = "Foo";
        Mono<String> inputMono = Mono.just(val);
        Mono<String> mono = capitalizer.capitalize(inputMono);
        mono.subscribe(v -> logger.info("Capitalized {} to {}", val, v));
        assertEquals(1, timer.count());
        logger.info("Timer executed in {} ms",
            timer.totalTime(TimeUnit.MILLISECONDS));
        assertTrue(timer.totalTime(TimeUnit.MILLISECONDS) > 1000);
    }
}

计时器报告执行时间大约为 1004 毫秒,延迟为 1000 毫秒,没有延迟为 4 毫秒。

于 2018-03-16T23:35:50.663 回答