2

因此,我们在内部使用 kafka 队列进行一些微服务的通信,也使用 zipkin 进行分布式跟踪。您能否建议如何在 zipkin 服务器中引入 kafka 跟踪以进行调试。

我遇到了brave-kafka-interceptor,但无法从提供的最小示例中使用 kafka 理解它。周围是否还有其他示例,或者使用了完全不同的库。

4

2 回答 2

0

将https://github.com/openzipkin-contrib/brave-kafka-interceptor#configuration中提到的配置添加到生产者和消费者中以配置跟踪。

一旦我们有了痕迹,我们需要将它们刷新到 zipkin UI。我们需要在 AsyncZipkinSpanHandler 对象上调用 flush 方法来将跟踪刷新到 zipkin。但是使用勇敢的 kafka 拦截器,我们无法访问该对象。

因此,我们需要在应用程序中提供一些空闲时间来刷新跟踪。基本上,如果我们的程序中有任何空闲时间,那么即使没有显式调用 flush() 方法,zipkin 也会帮助刷新跟踪。(我不确定这是否正确(关于空闲时间的刷新痕迹)。这完全来自我的观察。)

ProducerTracing.java

import brave.kafka.interceptor.TracingProducerInterceptor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerTracing {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        properties.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
        properties.setProperty("zipkin.http.endpoint", "http://127.0.0.1:9411/api/v2/spans");
        properties.setProperty("zipkin.sender.type", "HTTP");
        properties.setProperty("zipkin.encoding", "JSON");
        properties.setProperty("zipkin.remote.service.name", "kafka");
        properties.setProperty("zipkin.local.service.name", "producer");
        properties.setProperty("zipkin.trace.id.128bit.enabled", "true");
        properties.setProperty("zipkin.sampler.rate", "1.0F");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
        producer.send(record);

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

而在消费者端,我们不需要调用 sleep 方法来创建空闲时间。当消费者调用 poll() 方法时,我们会得到一些空闲时间,因为 poll() 方法会创建另一个线程并从 kafka 代理获取记录。因此,消费者可以将痕迹刷新到 zipkin。

消费者追踪.java

import brave.kafka.interceptor.TracingConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerTracing {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group");

        properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
        properties.setProperty("zipkin.http.endpoint", "http://127.0.0.1:9411/api/v2/spans");
        properties.setProperty("zipkin.sender.type", "HTTP");
        properties.setProperty("zipkin.encoding", "JSON");
        properties.setProperty("zipkin.remote.service.name", "kafka");
        properties.setProperty("zipkin.local.service.name", "consumer");
        properties.setProperty("zipkin.trace.id.128bit.enabled", "true");
        properties.setProperty("zipkin.sampler.rate", "1.0F");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singleton("topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.key() + " " + record.value());
            }
        }
    }
}

现在,我们可以在 Zipkin 中观察痕迹。 观察到的痕迹

于 2021-07-21T09:45:46.797 回答
0

让它工作的最简单方法是使用千分尺库并配置千分尺以将此数据发送到 Zipkin 服务器。

使用 micrometer 启用指标非常简单,您只需添加 micrometer-core 和 spring-cloud-starter-zipkin 库。

有关配置和代码的详细信息,请参阅本教程https://www.baeldung.com/tracing-services-with-zipkin

Micrometer 将向 Zipkin 报告消费者/生产者指标

于 2021-01-25T09:19:25.817 回答