我正在尝试使用两个通过 Kafka 通信并使用 opentracing 进行跟踪的 Quarkus (2.4.1.Final) 微服务(生产者和消费者)创建最基本的工作示例。
我遵循了kafka和opentracing教程,在开发模式下运行了生产者和消费者(因此他们创建了 redpanda kafka 代理),然后尝试发出 POJO 并在消费者和生产者中记录 traceId。据我了解,这应该是开箱即用的。
POJO 可以顺利发送、序列化和反序列化。消费者收到的 kafka 消息头甚至使用该uber-trace-id字段注入了正确的原始跟踪和跨度 ID(我已经检查过调试生产者和消费者) 。
但是,由于某种原因,在记录跟踪 ID 时,它们不匹配。这就像跟踪上下文“忘记”了它通过 kafka 接收到的跨度。请注意,我的业务需求是只打印每个日志的 traceId,以便我们可以通过 kibana 跟踪打印的日志。
制作人:
package com.example;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;
@Path("/hello")
public class ExampleResource {
private static final Logger log = Logger.getLogger(ExampleResource.class);
private final Emitter<Person> peopleEmitter;
public ExampleResource(@Channel("people") Emitter<Person> peopleEmitter) {this.peopleEmitter = peopleEmitter;}
@GET
@Path("/{name}")
@Produces(MediaType.APPLICATION_JSON)
public Person hello(@PathParam("name") String name) {
var p = new Person();
p.name = name;
log.info("Produced " + p.name);
peopleEmitter.send(p);
return p;
}
}
消费者:
package com.example;
import org.eclipse.microprofile.opentracing.Traced;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PeopleConsumer {
private static final Logger log = Logger.getLogger(PeopleConsumer.class);
@Traced
@Incoming("people")
public void process(Person person) {
log.info("received " + person.name);
}
}
POJO:
package com.example;
public class Person {
public String name;
}
生产者的应用配置:
quarkus.application.name=producer
quarkus.http.port=8090
quarkus.log.console.format=%d{HH:mm:ss} %-5p traceId=%X{traceId}, spanId=%X{spanId}, [%c{2.}] (%t) %s%e%n
mp.messaging.outgoing.people.connector=smallrye-kafka
mp.messaging.outgoing.people.interceptor.classes=io.opentracing.contrib.kafka.TracingProducerInterceptor
和消费者:
quarkus.http.port=8091
quarkus.application.name=consumer
quarkus.log.console.format=%d{HH:mm:ss} %-5p traceId=%X{traceId}, spanId=%X{spanId}, [%c{2.}] (%t) %s%e%n
mp.messaging.incoming.people.connector=smallrye-kafka
mp.messaging.incoming.people.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor
串行器/解串器
public class PersonDeserializer extends ObjectMapperDeserializer<Person> {
public PersonDeserializer() {
super(Person.class);
}
}
public class PersonSerializer extends ObjectMapperSerializer<Person> {
}
依赖项(两者相同):
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-opentracing</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-context-propagation</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-kafka-client</artifactId>
</dependency>
</dependencies>
基本用例:
将 http://localhost:8090/hello/John 放入浏览器。您将在生产者处看到日志:
18:43:24 INFO traceId=00019292a43349df, spanId=19292a43349df, [co.ex.ExampleResource] (executor-thread-0) Produced John
而在消费者
18:43:25 INFO traceId=1756e0a24c740fa6, spanId=1756e0a24c740fa6, [co.ex.PeopleConsumer] (pool-1-thread-1) received John
请注意跟踪 ID 不同。我不确定我还应该做什么/配置...