在 Spark 流式传输中,我会在它们到达时获取日志。但我想一次获得至少 N 个日志。如何实现?
从这个答案来看,Kafka 中似乎有这样的实用程序,但Spark 中似乎没有这样的实用程序来实现它。
在 Spark 流式传输中,我会在它们到达时获取日志。但我想一次获得至少 N 个日志。如何实现?
从这个答案来看,Kafka 中似乎有这样的实用程序,但Spark 中似乎没有这样的实用程序来实现它。
没有选项可让您设置从 Kafka 接收的消息数量的最小值。该选项maxOffsetsPerTrigger
允许您设置消息的最大数量。
如果您希望您的微批处理一次处理更多消息,您可以考虑增加触发间隔。
此外(参考您提供的链接),这也无法在 Kafka 本身中设置。您可以设置获取字节的最小数量,但不能设置消息编号的最小数量。
请注意,您可以通过结构化流中的 readStream 设置所有 Kafka 选项,kafka.
如Kafka 特定配置部分所述:
“Kafka自己的配置可以通过带有kafka.前缀的DataStreamReader.option进行设置,例如stream.option("kafka.bootstrap.servers", "host:port")。"
这样,您还可以使用 Consumer Configuration kafka.fetch.min.bytes
。但是,在 Kafka 2.5.0 安装上使用 Spark 3.0.1 进行测试并没有任何影响。添加配置kafka.fetch.max.wait.ms
时,我的测试中的获取时间确实发生了变化,但不是以可预测的方式(至少对我而言)。
查看 Spark 的KafkaDataConsumer的源代码,与纯KafkaConsumer相比,它似乎fetch
没有直接考虑任何最小/最大字节数。