0

我需要将 track 和 span id 添加到集群中运行的 Flink 作业中,请求流如下所示

用户 --> Rest API -> Kafka-topic-1 --> FlinkJob-1 --> Kafka-topic-2 --> FlinkJob-2 --> Consumer --> DB

我正在使用 Spring Boot 创建我的 REST API,并使用 Spring Sleuth 将轨道和跨度 ID 添加到生成的日志中,当调用 REST API 以及将消息放在 Kakfa-topic-1 上时添加轨道和跨度 ID但我无法弄清楚如何在 FlinkJob-1 和 FLinkJob-2 上消费消息时添加 track 和 span id,因为它们不在 spring 上下文中。

一种方法是跟踪和跨越 Id 到 kafka 消息头,并让 Kafka 消费者/生产者拦截器提取和记录跟踪和跨越 Id,我尝试了这个,但是我的拦截器没有被调用,因为 Flink API 使用 Flink 版本的 Kafka 客户端。

无法调用我的自定义 KafkaDeserializationSchema

public class MyDeserializationSchema implements KafkaDeserializationSchema<String> {

private static final Logger LOGGER = LoggerFactory.getLogger(MyDeserializationSchema.class);

@Override
public TypeInformation<String> getProducedType() {
    System.out.println("************** Invoked 1");
    LOGGER.debug("************** Invoked 1");
    return null;
}

@Override
public boolean isEndOfStream(String nextElement) {
    System.out.println("************** Invoked 2");
    LOGGER.debug("************** Invoked 2");
    return true;
}

@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    System.out.println("************** Invoked 3");
    LOGGER.debug("************** Invoked 3");
    return record.toString();
}

 }

有人可以建议我如何实现相同的目标。

4

2 回答 2

1

您也可以使用 KafkaDeserializationSchema 来获取标头

为了访问Kafka消息的key、value和metadata,KafkaDeserializationSchema有如下反序列化方法T deserialize(ConsumerRecord record)。

public class Bla implements KafkaDeserializationSchema {
    @Override
    public boolean isEndOfStream(Object dcEvents) {
        return false;
    }

    @Override
    public Object deserialize(ConsumerRecord consumerRecord) throws Exception {
        return null;
    }



    @Override
    public TypeInformation<DCEvents> getProducedType() {
        return null;
    }
于 2019-11-24T13:35:06.637 回答
0

您在这里使用的是简单字符串,并且在将字节序列化为字符串时可以执行类似以下代码的操作。

public class MyDeserializationSchema  implements KafkaDeserializationSchema<String> {
    @Override
    public boolean isEndOfStream(String nextElement) {
        return false;
    }

    @Override
    public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new String(record.value(), "UTF-8");
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
  }
于 2019-12-03T12:52:39.770 回答