2

我有两个通过 Kafka 相互交互的微服务,一个发布消息,另一个消费消息。发布者和消费者都在 Quarkus (1.12.0.Final) 上运行,并使用响应式消息传递和 Mutiny。

制片人:

package myproducer;

import myavro.MyAvro;
import io.smallrye.mutiny.Uni;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.util.concurrent.CompletableFuture;


@ApplicationScoped
public class Publisher {  
  @Channel("mytopic")
  @Inject
  public Emitter<MyAvro> myTopic;

  @Override
  public Uni<Void> publish(MyModel model) {
    MyAvro avro = MyModelMapper.INSTANCE.modelToAvro(model);

    return Uni.createFrom().emitter(e -> myTopic.send(Message.of(avro)
                                                .addMetadata(toOutgoingKafkaRecordMetadata(avro))
                                                .withAck(() -> {
                                                     e.complete(null);
                                                     return CompletableFuture.completedFuture(null);
                                                })));
  }
}

消费者:

package myconsumer;

import myavro.MyAvro;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class Consumer {

  @Incoming("mytopic")
  public Uni<Void> consume(IncomingKafkaRecord<String, MyAvro> message) {
    MyModel model = MyModelMapper.INSTANCE.avroToModel(message.getPayload());

    return ...;
  }

}

依赖关系:包括人工制品等

  • quarkus-smallrye-reactive-messaging-kafka
  • quarkus-resteasy-mutiny
  • quarkus-smallrye-opentracing
  • 夸库斯叛变
  • opentracing-kafka-client

Quarkus 配置(application.properties):包括其他

quarkus.jaeger.service-name=myservice
quarkus.jaeger.sampler-type=const
quarkus.jaeger.sampler-param=1
quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss} %-5p traceId=%X{traceId}, spanId=%X{spanId}, sampled=%X{sampled} [%c{2.}] (%t) %s%e%n

mp.messaging.incoming.mytopic.topic=abc
mp.messaging.incoming.mytopic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.mytopic.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
...
mp.messaging.incoming.mytopic.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor

使用此设置,根本不会记录 traceId 或 spanId(即使它们应该根据 Quarkus 的“使用 OpenTracing”指南)。只有在添加@org.eclipse.microprofile.opentracing.Traced 后才设置了traceId 和spanId,但两者在生产者和消费者上完全不相关。

我对照前面提到的 Quarkus 指南“使用 OpenTracing”检查了我的 opentracing 配置,但没有发现我这边配置错误的提示。在阅读了有关与 Mutiny 一起使用时依赖于 ThreadLocals 的 Quarkus 扩展中的问题的讨论后,我将人工制品 quarkus-smallrye-context-propagation 添加到我的依赖项中,但无济于事。

我怀疑这个问题可能与https://github.com/quarkusio/quarkus/issues/15182有关,尽管它是关于反应性路由而不是反应性消息传递。

有任何想法吗?

4

1 回答 1

1

这个问题不容易解决,首先我将尝试解释发生了什么。

OpenTracing 具有事务和跨度的概念。span 是一个执行块(方法、数据库调用、发送到 Kafka 主题),而事务是跨越多个组件(一组 span)的分布式进程。

这里的问题是,每次创建跨度时,它都没有找到任何 OpenTracing 事务,因此它会创建一个新事务。这就是为什么您的跨度都没有相互关联的原因。

在 OpenTracing 中,当您创建跨度时,您将基于跨度上下文创建它。每个 OpenTracing 集成都会基于扩展技术创建一个跨度上下文(我没有找到更好的术语),例如,HTTP 跨度上下文是基于 HTTP 标头的,而 Kafka 跨度上下文是基于 Kafka 标头的。

因此,要关联两个跨度,您需要使用来自提供正确OpenTracing ID的底层技术的一些上下文来创建跨度上下文。

例如,要关联两个 Kafka 跨度,您需要有一个uber-trace-id标头(这是 Jaeger 中 OpenTracing id 的默认名称)和跟踪标识符(有关此标头的格式,请参见tracespan-identity)。

知道这一点,有很多事情要做。

首先,您需要在方法uber-trace-id的传出消息中添加一个 Kafka 标头,以将@Traced方法的跨度与在 Kafka 生产者拦截器中创建的跨度相关联。

Tracer tracer = GlobalTracer.get(); // you can also inject it
JaegerSpanContext spanCtx = ((JaegerSpan)tracer.activeSpan()).context();
// uber-trace-id format: {trace-id}:{span-id}:{parent-span-id}:{flags}
//see https://www.jaegertracing.io/docs/1.21/client-libraries/#tracespan-identity
var uberTraceId = spanCtx.getTraceId() + ":" +
        Long.toHexString(spanCtx.getSpanId()) + ":" +
        Long.toHexString(spanCtx.getParentId()) + ":" +
        Integer.toHexString(spanCtx.getFlags());
headers.add("uber-trace-id", openTracingId.getBytes());

然后,您需要将您的@Traced方法与传入消息的跨度相关联(如果有)。为此,最简单的方法是添加一个 CDI 拦截器,该拦截器将尝试为@Traced基于方法参数注释的所有方法创建一个跨度上下文(它将搜索Message参数)。为此,该拦截器需要在 OpenTracing 拦截器之前执行,并在拦截器上下文中设置跨度上下文。

这是我们的拦截器实现,请随意使用或根据您的需要进行调整。

public class KafkaRecordOpenTracingInterceptor {

    @AroundInvoke
    public Object propagateSpanCtx(InvocationContext ctx) throws Exception {
        for (int i = 0 ; i < ctx.getParameters().length ; i++) {
            Object parameter = ctx.getParameters()[i];

            if (parameter instanceof Message) {
                Message message = (Message) parameter;

                Headers headers = message.getMetadata(IncomingKafkaRecordMetadata.class)
                    .map(IncomingKafkaRecordMetadata::getHeaders)
                    .get();
                SpanContext spanContext = getSpanContext(headers);
                ctx.getContextData().put(OpenTracingInterceptor.SPAN_CONTEXT, spanContext);
            }
        }

        return ctx.proceed();
    }

    private SpanContext getSpanContext(Headers headers) {
        return TracingKafkaUtils.extractSpanContext(headers, GlobalTracer.get());
    }
}

此代码同时使用 Quarkus OpenTracing 扩展和 Kafka OpenTracing 贡献库。

由于添加了从当前跨度上下文创建的 OpenTracing Kafka 标头,以及从传入消息的标头创建上下文,传出消息的相关性在任何情况下都应该发生。

于 2021-03-22T15:22:25.697 回答