我在 Spring Boot Reactive 应用程序中使用 Reactor Kafka,并使用 Spring Cloud Sleuth 进行分布式跟踪。我已将 Sleuth 设置为使用名为“traceId”的标头中的自定义传播密钥。我还自定义了日志格式以在我的日志中打印标题,所以像这样的请求
curl -H "traceId: 123456" -X POST http://localhost:8084/parallel
123456
将从控制器开始在下游任何地方打印每个日志。
我现在也希望通过 Kafka 传播此标头。我知道 Sleuth 也为 Kafka 内置了仪器,因此标题应该自动传播,但是我无法让它工作。
从我的控制器中,我在 Kafka 主题上生成一条消息,然后让另一个 Kafka 消费者拿起它进行处理。
这是我的控制器:
@RestController
@RequestMapping("/parallel")
public class BasicController {
private Logger logger = Loggers.getLogger(BasicController.class);
KafkaProducerLoadGenerator generator = new KafkaProducerLoadGenerator();
@PostMapping
public Mono<ResponseEntity> createMessage() {
int data = (int)(Math.random()*100000);
return Flux.just(data)
.doOnNext(num -> logger.info("Generating document for {}", num))
.map(generator::generateDocument)
.flatMap(generator::sendMessage)
.doOnNext(result ->
logger.info("Sent message {}, offset is {} to partition {}",
result.getT2().correlationMetadata(),
result.getT2().recordMetadata().offset(),
result.getT2().recordMetadata().partition()))
.doOnError(error -> logger.error("Error in subscribe while sending message", error))
.single()
.map(tuple -> ResponseEntity.status(HttpStatus.OK).body(tuple.getT1()));
}
}
这是生成有关 Kafka 主题的消息的代码
@Component
public class KafkaProducerLoadGenerator {
private static final Logger logger = Loggers.getLogger(KafkaProducerLoadGenerator.class);
private static final String bootstrapServers = "localhost:9092";
private static final String TOPIC = "load-topic";
private KafkaSender<Integer, String> sender;
private static int documentIndex = 0;
public KafkaProducerLoadGenerator() {
this(bootstrapServers);
}
public KafkaProducerLoadGenerator(String bootstrapServers) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "load-generator");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<Integer, String> senderOptions = SenderOptions.create(props);
sender = KafkaSender.create(senderOptions);
}
@NewSpan("generator.sendMessage")
public Flux<Tuple2<DataDocument, SenderResult<Integer>>> sendMessage(DataDocument document) {
return sendMessage(TOPIC, document)
.map(result -> Tuples.of(document, result));
}
public Flux<SenderResult<Integer>> sendMessage(String topic, DataDocument document) {
ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(topic, document.getData(), document.toString());
return sender.send(Mono.just(SenderRecord.create(producerRecord, document.getData())))
.doOnNext(record -> logger.info("Sent message to partition={}, offset={} ", record.recordMetadata().partition(), record.recordMetadata().offset()))
.doOnError(e -> logger.error("Error sending message " + documentIndex, e));
}
public DataDocument generateDocument(int data) {
return DataDocument.builder()
.header("Load Data")
.data(data)
.traceId("trace"+data)
.timestamp(Instant.now())
.build();
}
}
我的消费者看起来像这样:
@Component
@Scope(scopeName = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class IndividualConsumer {
private static final Logger logger = Loggers.getLogger(IndividualConsumer.class);
private static final String bootstrapServers = "localhost:9092";
private static final String TOPIC = "load-topic";
private int consumerIndex = 0;
public ReceiverOptions setupConfig(String bootstrapServers) {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "load-topic-consumer-"+consumerIndex);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "load-topic-multi-consumer-2");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DataDocumentDeserializer.class);
return ReceiverOptions.create(properties);
}
public void setIndex(int i) {
consumerIndex = i;
}
@EventListener(ApplicationReadyEvent.class)
public Disposable consumeMessage() {
ReceiverOptions<Integer, DataDocument> receiverOptions = setupConfig(bootstrapServers)
.subscription(Collections.singleton(TOPIC))
.addAssignListener(receiverPartitions -> logger.debug("onPartitionsAssigned {}", receiverPartitions))
.addRevokeListener(receiverPartitions -> logger.debug("onPartitionsRevoked {}", receiverPartitions));
Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(receiverOptions);
return receiver.receive();
});
Consumer<? super ReceiverRecord<Integer, DataDocument>> acknowledgeOffset = record -> record.receiverOffset().acknowledge();
return messages
.publishOn(Schedulers.newSingle("Parallel-Consumer"))
.doOnError(error -> logger.error("Error in the reactive chain", error))
.delayElements(Duration.ofMillis(100))
.doOnNext(record -> {
logger.info("Consumer {}: Received from partition {}, offset {}, data with index {}",
consumerIndex,
record.receiverOffset().topicPartition(),
record.receiverOffset().offset(),
record.value().getData());
})
.doOnNext(acknowledgeOffset)
.doOnError(error -> logger.error("Error receiving record", error))
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.subscribe();
}
}
我希望 Sleuth 自动将内置的 Brave 跟踪和自定义标头传递给消费者,以便跟踪覆盖整个事务。
但是我有两个问题。
- 生成器 bean 没有得到与控制器中的相同的跟踪。它对发送的每条消息使用不同的(和新的)跟踪。
- 跟踪不会从 Kafka 生产者传播到 Kafka 消费者。
我可以通过用一个简单的 Java 类替换生成器 bean 并在控制器中实例化它来解决上面的 #1。然而,这意味着我不能自动装配其他依赖项,而且无论如何它都不能解决#2。
我能够加载 bean 的一个实例,brave.kafka.clients.KafkaTracing
所以我知道它是由 Spring 加载的。但是,它看起来仪器不工作。我使用 Kafka Tool 检查了 Kafka 上的内容,并且没有在任何消息上填充任何标题。事实上,消费者根本没有踪迹。
2020-05-06 23:57:32.898 INFO parallel-consumer:local [123-21922,578c510e23567aec,578c510e23567aec] 8180 --- [reactor-http-nio-3] rja.parallelconsumers.BasicController : Generating document for 23965
2020-05-06 23:57:32.907 INFO parallel-consumer:local [52e02d36b59c5acd,52e02d36b59c5acd,52e02d36b59c5acd] 8180 --- [single-11] r.p.kafka.KafkaProducerLoadGenerator : Sent message to partition=17, offset=0
2020-05-06 23:57:32.908 INFO parallel-consumer:local [123-21922,578c510e23567aec,578c510e23567aec] 8180 --- [single-11] rja.parallelconsumers.BasicController : Sent message 23965, offset is 0 to partition 17
2020-05-06 23:57:33.012 INFO parallel-consumer:local [-,-,-] 8180 --- [parallel-5] r.parallelconsumers.IndividualConsumer : Consumer 8: Received from partition load-topic-17, offset 0, data with index 23965
在上面的日志中,[123-21922,578c510e23567aec,578c510e23567aec]
是[custom-trace-header, brave traceId, brave spanId]
我错过了什么?