0

接下来(抱歉,有不同的用户):Kafka Key access on Ingress of a Python Flink Stateful function

我们的用例是我们使用 Kafka 标头作为跟踪和沿袭以及所需元数据的手段。看这个: https ://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/ io/kafka/binders/ingress/v1/RoutableKafkaIngressDeserializer.java#L45-L61看起来像使用标准反序列化器,标头被丢弃。

实际上,我想要的是一种注入我自己的反序列化器的方法,该反序列化器将返回一条包含此消息和记录中的任何其他元数据的消息。我想添加类似 UniversalKafkaIngress 的东西,以便我可以使用远程模块对其进行配置。

查看代码,我可以看到我可以注册一个新的 ExtensionModule,并替换反序列化器(并创建一个自定义类型)。这是推荐的吗?如果是这样 - 是否有任何文档(如果没有,我如何配置 statefun 来获取它)?

或者,还有其他首选方法吗?

再次感谢...

4

1 回答 1

0

啊 - 发现我哪里出错了。

您可以使用标准模块 SPI 进程加载 ExtensionModule - 因此将其注册为新的“通用”入口,以便可以远程加载。我有一个错字——这就是我战斗的原因。

有一些陷阱 - 我稍后会发布一个要点来展示它是如何完成的。

于 2021-11-25T10:46:08.580 回答