接下来(抱歉,有不同的用户):Kafka Key access on Ingress of a Python Flink Stateful function
我们的用例是我们使用 Kafka 标头作为跟踪和沿袭以及所需元数据的手段。看这个: https ://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/ io/kafka/binders/ingress/v1/RoutableKafkaIngressDeserializer.java#L45-L61看起来像使用标准反序列化器,标头被丢弃。
实际上,我想要的是一种注入我自己的反序列化器的方法,该反序列化器将返回一条包含此消息和记录中的任何其他元数据的消息。我想添加类似 UniversalKafkaIngress 的东西,以便我可以使用远程模块对其进行配置。
查看代码,我可以看到我可以注册一个新的 ExtensionModule,并替换反序列化器(并创建一个自定义类型)。这是推荐的吗?如果是这样 - 是否有任何文档(如果没有,我如何配置 statefun 来获取它)?
或者,还有其他首选方法吗?
再次感谢...