我需要将 track 和 span id 添加到集群中运行的 Flink 作业中,请求流如下所示
用户 --> Rest API -> Kafka-topic-1 --> FlinkJob-1 --> Kafka-topic-2 --> FlinkJob-2 --> Consumer --> DB
我正在使用 Spring Boot 创建我的 REST API,并使用 Spring Sleuth 将轨道和跨度 ID 添加到生成的日志中,当调用 REST API 以及将消息放在 Kakfa-topic-1 上时添加轨道和跨度 ID但我无法弄清楚如何在 FlinkJob-1 和 FLinkJob-2 上消费消息时添加 track 和 span id,因为它们不在 spring 上下文中。
一种方法是跟踪和跨越 Id 到 kafka 消息头,并让 Kafka 消费者/生产者拦截器提取和记录跟踪和跨越 Id,我尝试了这个,但是我的拦截器没有被调用,因为 Flink API 使用 Flink 版本的 Kafka 客户端。
无法调用我的自定义 KafkaDeserializationSchema
public class MyDeserializationSchema implements KafkaDeserializationSchema<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(MyDeserializationSchema.class);
@Override
public TypeInformation<String> getProducedType() {
System.out.println("************** Invoked 1");
LOGGER.debug("************** Invoked 1");
return null;
}
@Override
public boolean isEndOfStream(String nextElement) {
System.out.println("************** Invoked 2");
LOGGER.debug("************** Invoked 2");
return true;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
System.out.println("************** Invoked 3");
LOGGER.debug("************** Invoked 3");
return record.toString();
}
}
有人可以建议我如何实现相同的目标。