我正在尝试编写一个 Pyflink 应用程序来测量延迟和吞吐量。我的数据来自 kafka 主题的 json 对象,并DataStream
使用SimpleStringSchema
-class 加载到反序列化中。按照这篇文章的答案(如何在 Kafka 和 Flink 环境中测试性能?)我让 Kafka 生产者在事件中放置时间戳,但现在很难理解如何访问这些时间戳。我知道上面提到的帖子为这个问题提供了一个解决方案,但我正在努力将这个示例转移到 python,因为文档/示例很少。
这另一篇文章(Apache Flink:如何在摄取时间模式下获取事件的时间戳?)建议我应该定义一个ProcessFunction
。但是,在这里我也不确定语法。我可能不得不做这样的事情(取自:https ://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job .py )
class MyProcessFunction():
def process_element(self, value, ctx):
result = value.get_time_stamp()
yield result
在这里做的正确方法是什么value.get_time_stamp()
?或者是否有一种更简单的方法可以解决我不知道的问题?
谢谢!