我用org.apache.kafka:kafka-streams:0.10.0.1
我正在尝试使用基于时间序列的流,该流似乎不会触发KStream.Process()
触发(“标点符号”)。(请参阅此处以供参考)
在KafkaStreams
配置中,我传递了这个参数(等等):
config.put(
StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
EventTimeExtractor.class.getName());
这里,EventTimeExtractor
是一个自定义时间戳提取器(实现org.apache.kafka.streams.processor.TimestampExtractor
),用于从 JSON 数据中提取时间戳信息。
当每条新记录被拉入时,我希望这会调用我的对象(派生自TimestampExtractor
)。有问题的流是 2 * 10^6 记录/分钟。我已punctuate()
设置为 60 秒,但它永远不会触发。我知道数据非常频繁地通过这个跨度,因为它会拉动旧值来迎头赶上。
事实上,它根本不会被调用。
- 这是在 KStream 记录上设置时间戳的错误方法吗?
- 这是声明此配置的错误方法吗?