1

我正在使用 Spark 3.0.2,并且我有一个流式作业,它使用来自 Kafka 的数据,触发持续时间为“1 分钟”。

我在 Spark UI 中看到定义的每 1 分钟有一个新作业,但我看到onQueryProgress每 5~6 分钟调用一次方法。我认为应该在每个微批处理之后直接调用此方法。

有没有办法控制这个持续时间并使其等于触发持续时间?

4

2 回答 2

1

inQueryProgressStreamingQueryListener的方法是在每个微批次内数据处理完毕后异步调用的。

您会看到此侦听器仅每 5~6 分钟触发一次,因为流式作业需要花时间来处理在微批处理中获取的所有数据。将触发持续时间设置为 1 分钟将使 Spark 相应地计划任务,但这并不意味着作业也能够在 1 分钟的时间范围内处理所有可用数据。

为了减少您的查询从 Kafka 获取的数据量,您可以使用 source 选项maxOffsetsPerTrigger

顺便说一句,如果您不处理任何数据,默认情况下每 10 秒调用一次此方法。如果您想避免这种情况发生,您可以执行if(event.progress.numInputRows > 0).

于 2021-04-19T20:07:18.487 回答
0

我发现我的案例的原因是该onQueryProgress方法需要 5 分钟才能完成。

正如 Mike 提到的那样,onQueryProgress它被异步调用,但我认为它使用同一个线程来调用这个方法。所以它正在等待方法调用完成再次调用它。

因此,在我的案例中,解决方案是找出为什么需要这么长时间,并使其比触发持续时间更快。

于 2021-04-21T15:48:57.443 回答