1

我正在尝试使用两个通过 Kafka 通信并使用 opentracing 进行跟踪的 Quarkus (2.4.1.Final) 微服务​​(生产者和消费者)创建最基本的工作示例。

我遵循了kafkaopentracing教程,在开发模式下运行了生产者和消费者(因此他们创建了 redpanda kafka 代理),然后尝试发出 POJO 并在消费者和生产者中记录 traceId。据我了解,这应该是开箱即用的。

POJO 可以顺利发送、序列化和反序列化。消费者收到的 kafka 消息头甚至使用该uber-trace-id字段注入了正确的原始跟踪和跨度 ID(我已经检查过调试生产者和消费者) 。

但是,由于某种原因,在记录跟踪 ID 时,它们不匹配。这就像跟踪上下文“忘记”了它通过 kafka 接收到的跨度。请注意,我的业务需求是只打印每个日志的 traceId,以便我们可以通过 kibana 跟踪打印的日志。

制作人:

package com.example;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;

@Path("/hello")
public class ExampleResource {
    private static final  Logger log = Logger.getLogger(ExampleResource.class);
    private final Emitter<Person> peopleEmitter;

    public ExampleResource(@Channel("people") Emitter<Person> peopleEmitter) {this.peopleEmitter = peopleEmitter;}

    @GET
    @Path("/{name}")
    @Produces(MediaType.APPLICATION_JSON)
    public Person hello(@PathParam("name") String name) {
        var p = new Person();
        p.name = name;
        log.info("Produced " + p.name);
        peopleEmitter.send(p);
        return p;
    }
}

消费者:

package com.example;

import org.eclipse.microprofile.opentracing.Traced;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PeopleConsumer {
    private static final Logger log = Logger.getLogger(PeopleConsumer.class);

    @Traced
    @Incoming("people")
    public void process(Person person) {
        log.info("received " + person.name);
    }
}

POJO:

package com.example;

public class Person {
    public String name;
}

生产者的应用配置:

quarkus.application.name=producer
quarkus.http.port=8090
quarkus.log.console.format=%d{HH:mm:ss} %-5p traceId=%X{traceId}, spanId=%X{spanId}, [%c{2.}] (%t) %s%e%n

mp.messaging.outgoing.people.connector=smallrye-kafka
mp.messaging.outgoing.people.interceptor.classes=io.opentracing.contrib.kafka.TracingProducerInterceptor

和消费者:

quarkus.http.port=8091
quarkus.application.name=consumer
quarkus.log.console.format=%d{HH:mm:ss} %-5p traceId=%X{traceId}, spanId=%X{spanId}, [%c{2.}] (%t) %s%e%n

mp.messaging.incoming.people.connector=smallrye-kafka
mp.messaging.incoming.people.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor

串行器/解串器

public class PersonDeserializer extends ObjectMapperDeserializer<Person> {
    public PersonDeserializer() {
        super(Person.class);
    }
}


public class PersonSerializer extends ObjectMapperSerializer<Person> {
}

依赖项(两者相同):

    <dependencies>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-smallrye-opentracing</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-smallrye-context-propagation</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-arc</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-resteasy-reactive-jackson</artifactId>
        </dependency>
        <dependency>
            <groupId>io.opentracing.contrib</groupId>
            <artifactId>opentracing-kafka-client</artifactId>
        </dependency>
    </dependencies>

基本用例:

将 http://localhost:8090/hello/John 放入浏览器。您将在生产者处看到日志:

18:43:24 INFO  traceId=00019292a43349df, spanId=19292a43349df, [co.ex.ExampleResource] (executor-thread-0) Produced John

而在消费者

18:43:25 INFO  traceId=1756e0a24c740fa6, spanId=1756e0a24c740fa6, [co.ex.PeopleConsumer] (pool-1-thread-1) received John

请注意跟踪 ID 不同。我不确定我还应该做什么/配置...

4

0 回答 0