0

我正在尝试编写一个 Pyflink 应用程序来测量延迟和吞吐量。我的数据来自 kafka 主题的 json 对象,并DataStream使用SimpleStringSchema-class 加载到反序列化中。按照这篇文章的答案(如何在 Kafka 和 Flink 环境中测试性能?)我让 Kafka 生产者在事件中放置时间戳,但现在很难理解如何访问这些时间戳。我知道上面提到的帖子为这个问题提供了一个解决方案,但我正在努力将这个示例转移到 python,因为文档/示例很少。

这另一篇文章(Apache Flink:如何在摄取时间模式下获取事件的时间戳?)建议我应该定义一个ProcessFunction。但是,在这里我也不确定语法。我可能不得不做这样的事情(取自:https ://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job .py )

class MyProcessFunction():

    def process_element(self, value, ctx):
        result = value.get_time_stamp()
        yield result

在这里做的正确方法是什么value.get_time_stamp()?或者是否有一种更简单的方法可以解决我不知道的问题?

谢谢!

4

1 回答 1

2

当您设置一个由 Kafka 主题支持的表时,您可以为 Kafka 时间戳声明一个虚拟列,如event_time本示例中的列:

CREATE TABLE KafkaTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

有关使用 Kafka 标头中的元数据的更多信息,请参阅Flink 的 Kafka Table 连接器的文档。

于 2021-04-22T08:27:48.493 回答