0

我一直在看 Flink Stateful Functions。它看起来非常有希望——除了一件事——我希望我只是想念它。

对于我的生活,看不到从 Python 中的 kafka 入口访问 kafka 密钥的方法。在 Java 中,我看到我可以使用反序列化器并将其有效地打包到解码的message对象中。但我找不到替代品。
在我们的例子中,键具有有价值的信息,但值中不存在。

有人遇到过这个 - 还是我错过了?

4

1 回答 1

1

我想提到的第一件事是远程功能的内置 Kafka 入口要求密钥可以解释为 UTF-8 字符串。如果这确实是您的情况,那么您只需通过以下方式获取它:

def example(context, message):
  key = context.address.id 
  ... 
  print(key) # utf8 string

如果不是这种情况,那么不幸的是,此时您将不得不使用嵌入式 SDK,在将消息转发到远程函数之前提取键和值。

于 2021-11-19T13:38:21.683 回答