这是我们如何“解决”这个问题的。这基本上是mike
在接受的答案中所写的方法。
在我们的例子中,消息的大小变化很小,因此我们知道处理一个批处理需要多少时间。所以简而言之,我们:
- 更改了
Trigger.Once()
with,Trigger.ProcessingTime(<ms>)
因为maxOffsetsPerTrigger
适用于此模式
awaitTermination(<ms>)
通过调用来模仿杀死这个正在运行的查询Trigger.Once()
- 将处理间隔设置为大于终止间隔,以便恰好适合处理一个“批次”
val kafkaOptions = Map[String, String](
"kafka.bootstrap.servers" -> "localhost:9092",
"failOnDataLoss" -> "false",
"subscribePattern" -> "testTopic",
"startingOffsets" -> "earliest",
"maxOffsetsPerTrigger" -> "10", // "batch" size
)
val streamWriterOptions = Map[String, String](
"checkpointLocation" -> "path/to/checkpoints",
)
val processingInterval = 30000L
val terminationInterval = 15000L
sparkSession
.readStream
.format("kafka")
.options(kafkaOptions)
.load()
.writeStream
.options(streamWriterOptions)
.format("Console")
.trigger(Trigger.ProcessingTime(processingInterval))
.start()
.awaitTermination(terminationInterval)
这是有效的,因为将根据maxOffsetsPerTrigger
限制读取和处理第一批。说,在 10 秒内。然后开始处理第二批,但它在大约 5 秒后在操作中间终止,并且从未达到设置的 30 秒标记。但它正确存储了偏移量。在下一次运行中拾取并处理这个“被杀死”的批次。
这种方法的一个缺点是您必须大致知道处理一个“批次”需要多少时间 - 如果您设置的terminationInterval
太低,作业的输出将始终没有。
当然,如果您不关心在一次运行中处理的确切批次数,您可以轻松地将 调整为processingInterval
比terminationInterval
. 在这种情况下,您可以一次处理不同数量的批次,但仍然尊重maxOffsetsPerTrigger
.