2

我在 Spring Boot Reactive 应用程序中使用 Reactor Kafka,并使用 Spring Cloud Sleuth 进行分布式跟踪。我已将 Sleuth 设置为使用名为“traceId”的标头中的自定义传播密钥。我还自定义了日志格式以在我的日志中打印标题,所以像这样的请求

curl -H "traceId: 123456" -X POST http://localhost:8084/parallel

123456将从控制器开始在下游任何地方打印每个日志。

我现在也希望通过 Kafka 传播此标头。我知道 Sleuth 也为 Kafka 内置了仪器,因此标题应该自动传播,但是我无法让它工作。

从我的控制器中,我在 Kafka 主题上生成一条消息,然后让另一个 Kafka 消费者拿起它进行处理。

这是我的控制器:

@RestController
@RequestMapping("/parallel")
public class BasicController {

    private Logger logger = Loggers.getLogger(BasicController.class);

    KafkaProducerLoadGenerator generator = new KafkaProducerLoadGenerator();    

    @PostMapping
    public Mono<ResponseEntity> createMessage() {
        int data = (int)(Math.random()*100000);
        return Flux.just(data)
                .doOnNext(num -> logger.info("Generating document for {}", num))
                .map(generator::generateDocument)
                .flatMap(generator::sendMessage)
                .doOnNext(result ->
                        logger.info("Sent message {}, offset is {} to partition {}",
                                result.getT2().correlationMetadata(),
                                result.getT2().recordMetadata().offset(),
                                result.getT2().recordMetadata().partition()))
                .doOnError(error -> logger.error("Error in subscribe while sending message", error))
                .single()
                .map(tuple -> ResponseEntity.status(HttpStatus.OK).body(tuple.getT1()));

    }
}

这是生成有关 Kafka 主题的消息的代码

@Component
public class KafkaProducerLoadGenerator {

    private static final Logger logger = Loggers.getLogger(KafkaProducerLoadGenerator.class);

    private static final String bootstrapServers = "localhost:9092";
    private static final String TOPIC = "load-topic";

    private KafkaSender<Integer, String> sender;

    private static int documentIndex = 0;

    public KafkaProducerLoadGenerator() {
        this(bootstrapServers);
    }

    public KafkaProducerLoadGenerator(String bootstrapServers) {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "load-generator");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        SenderOptions<Integer, String> senderOptions = SenderOptions.create(props);
        sender = KafkaSender.create(senderOptions);
    }

    @NewSpan("generator.sendMessage")
    public Flux<Tuple2<DataDocument, SenderResult<Integer>>> sendMessage(DataDocument document) {
        return sendMessage(TOPIC, document)
                .map(result -> Tuples.of(document, result));
    }

    public Flux<SenderResult<Integer>> sendMessage(String topic, DataDocument document) {
        ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(topic, document.getData(), document.toString());
        return sender.send(Mono.just(SenderRecord.create(producerRecord, document.getData())))
                .doOnNext(record -> logger.info("Sent message to partition={}, offset={} ", record.recordMetadata().partition(), record.recordMetadata().offset()))
                .doOnError(e -> logger.error("Error sending message " + documentIndex, e));
    }

    public DataDocument generateDocument(int data) {
        return DataDocument.builder()
                .header("Load Data")
                .data(data)
                .traceId("trace"+data)
                .timestamp(Instant.now())
                .build();
    }
}

我的消费者看起来像这样:

@Component
@Scope(scopeName = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class IndividualConsumer {

    private static final Logger logger = Loggers.getLogger(IndividualConsumer.class);

    private static final String bootstrapServers = "localhost:9092";
    private static final String TOPIC = "load-topic";

    private int consumerIndex = 0;

    public ReceiverOptions setupConfig(String bootstrapServers) {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "load-topic-consumer-"+consumerIndex);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "load-topic-multi-consumer-2");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DataDocumentDeserializer.class);
        return ReceiverOptions.create(properties);
    }

    public void setIndex(int i) {
        consumerIndex = i;
    }

    @EventListener(ApplicationReadyEvent.class)
    public Disposable consumeMessage() {
        ReceiverOptions<Integer, DataDocument> receiverOptions = setupConfig(bootstrapServers)
                .subscription(Collections.singleton(TOPIC))
                .addAssignListener(receiverPartitions -> logger.debug("onPartitionsAssigned {}", receiverPartitions))
                .addRevokeListener(receiverPartitions -> logger.debug("onPartitionsRevoked {}", receiverPartitions));

        Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
            KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(receiverOptions);
            return receiver.receive();
        });
        Consumer<? super ReceiverRecord<Integer, DataDocument>> acknowledgeOffset = record -> record.receiverOffset().acknowledge();
        return messages
                .publishOn(Schedulers.newSingle("Parallel-Consumer"))
                .doOnError(error -> logger.error("Error in the reactive chain", error))
                .delayElements(Duration.ofMillis(100))
                .doOnNext(record -> {
                    logger.info("Consumer {}: Received from partition {}, offset {}, data with index {}",
                            consumerIndex,
                            record.receiverOffset().topicPartition(),
                            record.receiverOffset().offset(),
                            record.value().getData());
                })
                .doOnNext(acknowledgeOffset)
                .doOnError(error -> logger.error("Error receiving record", error))
                .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
                .subscribe();
    }
}

我希望 Sleuth 自动将内置的 Brave 跟踪和自定义标头传递给消费者,以便跟踪覆盖整个事务。

但是我有两个问题。

  1. 生成器 bean 没有得到与控制器中的相同的跟踪。它对发送的每条消息使用不同的(和新的)跟踪。
  2. 跟踪不会从 Kafka 生产者传播到 Kafka 消费者。

我可以通过用一个简单的 Java 类替换生成器 bean 并在控制器中实例化它来解决上面的 #1。然而,这意味着我不能自动装配其他依赖项,而且无论如何它都不能解决#2。

我能够加载 bean 的一个实例,brave.kafka.clients.KafkaTracing所以我知道它是由 Spring 加载的。但是,它看起来仪器不工作。我使用 Kafka Tool 检查了 Kafka 上的内容,并且没有在任何消息上填充任何标题。事实上,消费者根本没有踪迹。

2020-05-06 23:57:32.898     INFO  parallel-consumer:local [123-21922,578c510e23567aec,578c510e23567aec] 8180 --- [reactor-http-nio-3] rja.parallelconsumers.BasicController    : Generating document for 23965
2020-05-06 23:57:32.907     INFO  parallel-consumer:local [52e02d36b59c5acd,52e02d36b59c5acd,52e02d36b59c5acd] 8180 --- [single-11] r.p.kafka.KafkaProducerLoadGenerator     : Sent message to partition=17, offset=0 
2020-05-06 23:57:32.908     INFO  parallel-consumer:local [123-21922,578c510e23567aec,578c510e23567aec] 8180 --- [single-11] rja.parallelconsumers.BasicController    : Sent message 23965, offset is 0 to partition 17
2020-05-06 23:57:33.012     INFO  parallel-consumer:local [-,-,-] 8180 --- [parallel-5] r.parallelconsumers.IndividualConsumer   : Consumer 8: Received from partition load-topic-17, offset 0, data with index 23965

在上面的日志中,[123-21922,578c510e23567aec,578c510e23567aec][custom-trace-header, brave traceId, brave spanId]

我错过了什么?

4

0 回答 0