我有两个通过 Kafka 相互交互的微服务,一个发布消息,另一个消费消息。发布者和消费者都在 Quarkus (1.12.0.Final) 上运行,并使用响应式消息传递和 Mutiny。
制片人:
package myproducer;
import myavro.MyAvro;
import io.smallrye.mutiny.Uni;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.util.concurrent.CompletableFuture;
@ApplicationScoped
public class Publisher {
@Channel("mytopic")
@Inject
public Emitter<MyAvro> myTopic;
@Override
public Uni<Void> publish(MyModel model) {
MyAvro avro = MyModelMapper.INSTANCE.modelToAvro(model);
return Uni.createFrom().emitter(e -> myTopic.send(Message.of(avro)
.addMetadata(toOutgoingKafkaRecordMetadata(avro))
.withAck(() -> {
e.complete(null);
return CompletableFuture.completedFuture(null);
})));
}
}
消费者:
package myconsumer;
import myavro.MyAvro;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class Consumer {
@Incoming("mytopic")
public Uni<Void> consume(IncomingKafkaRecord<String, MyAvro> message) {
MyModel model = MyModelMapper.INSTANCE.avroToModel(message.getPayload());
return ...;
}
}
依赖关系:包括人工制品等
- quarkus-smallrye-reactive-messaging-kafka
- quarkus-resteasy-mutiny
- quarkus-smallrye-opentracing
- 夸库斯叛变
- opentracing-kafka-client
Quarkus 配置(application.properties):包括其他
quarkus.jaeger.service-name=myservice
quarkus.jaeger.sampler-type=const
quarkus.jaeger.sampler-param=1
quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss} %-5p traceId=%X{traceId}, spanId=%X{spanId}, sampled=%X{sampled} [%c{2.}] (%t) %s%e%n
mp.messaging.incoming.mytopic.topic=abc
mp.messaging.incoming.mytopic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.mytopic.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
...
mp.messaging.incoming.mytopic.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor
使用此设置,根本不会记录 traceId 或 spanId(即使它们应该根据 Quarkus 的“使用 OpenTracing”指南)。只有在添加@org.eclipse.microprofile.opentracing.Traced 后才设置了traceId 和spanId,但两者在生产者和消费者上完全不相关。
我对照前面提到的 Quarkus 指南“使用 OpenTracing”检查了我的 opentracing 配置,但没有发现我这边配置错误的提示。在阅读了有关与 Mutiny 一起使用时依赖于 ThreadLocals 的 Quarkus 扩展中的问题的讨论后,我将人工制品 quarkus-smallrye-context-propagation 添加到我的依赖项中,但无济于事。
我怀疑这个问题可能与https://github.com/quarkusio/quarkus/issues/15182有关,尽管它是关于反应性路由而不是反应性消息传递。
有任何想法吗?