7

我用org.apache.kafka:kafka-streams:0.10.0.1

我正在尝试使用基于时间序列的流,该流似乎不会触发KStream.Process()触发(“标点符号”)。(请参阅此处以供参考)

KafkaStreams配置中,我传递了这个参数(等等):

config.put(
  StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
  EventTimeExtractor.class.getName());

这里,EventTimeExtractor是一个自定义时间戳提取器(实现org.apache.kafka.streams.processor.TimestampExtractor),用于从 JSON 数据中提取时间戳信息。

当每条新记录被拉入时,我希望这会调用我的对象(派生自TimestampExtractor)。有问题的流是 2 * 10^6 记录/分钟。我已punctuate()设置为 60 秒,但它永远不会触发。我知道数据非常频繁地通过这个跨度,因为它会拉动旧值来迎头赶上。

事实上,它根本不会被调用。

  • 这是在 KStream 记录上设置时间戳的错误方法吗?
  • 这是声明此配置的错误方法吗?
4

3 回答 3

8

2017 年 11 月更新: Kafka 1.0 中的 Kafka Streams 现在支持punctuate()流时间和处理时间(挂钟时间)行为。因此,您可以选择您喜欢的任何行为。

你的设置对我来说似乎是正确的。

您需要注意的事项:从 Kafka 0.10.0 开始,该punctuate()方法在流时间上运行(默认情况下,即基于默认时间戳提取器,流时间将意味着事件时间)。而stream-time只有在有新数据记录进来时才会提前,而stream-time提前多少由这些新记录的相关时间戳决定。

例如:

  • 假设您已设置punctuate()为每 1 分钟调用一次 = 60 * 1000(注意:1 分钟的流时间)。现在,如果碰巧在接下来的 5 分钟内没有收到任何数据,punctuate()则根本不会被调用——即使您可能期望它会被调用 5 次。为什么?同样,因为punctuate()取决于流时间,并且流时间仅基于新接收的数据记录而提前。

这可能会导致您看到的行为吗?

展望未来:Kafka 项目中已经在讨论如何使其punctuate()更加灵活,例如不仅基于stream-time(默认为event-time)而且基于processing-time.

于 2016-09-16T19:37:29.843 回答
2

你的方法似乎是正确的。比较 http://docs.confluent.io/3.0.1/streams/developer-guide.html#optional-configuration-parameters的段落“时间戳提取器(timestamp.extractor):”

不确定,为什么不使用您的自定义时间戳提取器。看看org.apache.kafka.streams.processor.internals.StreamTask。在构造函数中应该有类似

TimestampExtractor timestampExtractor1 = (TimestampExtractor)config.getConfiguredInstance("timestamp.extractor", TimestampExtractor.class);

检查您的自定义提取器是否在那里被拾取...

于 2016-09-16T18:37:40.120 回答
1

我认为这是经纪人级别的另一个问题。我使用具有更多 CPU 和 RAM 的实例重建了集群。现在我得到了我预期的结果。

注意远处的观察者:如果您的KStream应用程序行为异常,请查看您的代理并确保它们没有卡在 GC 中并且有足够的“空间”用于文件句柄、RAM 等。

也可以看看

于 2016-09-25T20:52:56.800 回答