2

在 Spark 流式传输中,我会在它们到达时获取日志。但我想一次获得至少 N 个日志。如何实现?

这个答案来看,Kafka 中似乎有这样的实用程序,但Spark 中似乎没有这样的实用程序来实现它。

4

1 回答 1

1

没有选项可让您设置从 Kafka 接收的消息数量的最小值。该选项maxOffsetsPerTrigger允许您设置消息的最大数量

如果您希望您的微批处理一次处理更多消息,您可以考虑增加触发间隔。

此外(参考您提供的链接),这也无法在 Kafka 本身中设置。您可以设置获取字节的最小数量,但不能设置消息编号的最小数量。

请注意,您可以通过结构化流中的 readStream 设置所有 Kafka 选项,kafka.Kafka 特定配置部分所述:

“Kafka自己的配置可以通过带有kafka.前缀的DataStreamReader.option进行设置,例如stream.option("kafka.bootstrap.servers", "host:port")。"

这样,您还可以使用 Consumer Configuration kafka.fetch.min.bytes。但是,在 Kafka 2.5.0 安装上使用 Spark 3.0.1 进行测试并没有任何影响。添加配置kafka.fetch.max.wait.ms时,我的测试中的获取时间确实发生了变化,但不是以可预测的方式(至少对我而言)。

查看 Spark 的KafkaDataConsumer的源代码,与纯KafkaConsumer相比,它似乎fetch没有直接考虑任何最小/最大字节数。

于 2021-01-22T07:20:55.090 回答