8

是否有可能拥有一个运行主题中所有数据然后退出的 Kafka Streams 应用程序?

示例我正在根据日期将数据生成到主题中。消费者被 cron 启动,遍历所有可用数据,然后......做什么?我不希望它坐下来等待更多数据。假设一切都在那里,然后优雅地退出。

可能的?

4

2 回答 2

6

在 Kafka Streams(与其他流处理解决方案一样)中,它不是“数据结束”,因为它首先是流处理——而不是批处理。

不过,您可以观察 Kafka Streams 应用程序的“滞后”,如果没有滞后则将其关闭(滞后,是尚未消费的消息数)。

例如,您可以使用bin/kafka-consumer-groups.sh来检查您的 Streams 应用程序的延迟(应用程序 ID 用作消费者组 ID)。如果您想将此嵌入到您的 Streams 应用程序中,您可以使用它kafka.admin.AdminClient来获取消费者组信息。

于 2016-08-20T08:24:00.287 回答
4

您可以创建一个consumer,然后一旦它停止提取数据,您就可以调用consumer.close()。或者,如果您以后想再次投票,请稍后再consumer.pause()致电.resume

一种方法是在消费者轮询块中。如

data = consumer.poll()
if (!data.next()) {
   consumer.close()
}

请记住,poll返回ConsumerRecord<K,V>并符合Iterable接口。

于 2016-08-19T23:26:57.857 回答