1

我正在使用队列(Apache Pulsar)中的跟踪日志。我使用 5 keyedPrcoessFunction 并最终将有效负载下沉到 Postgres Db。我需要为每个 keyedProcessFunction 订购每个 customerId。现在我通过

Datasource.keyBy(fooKeyFunction).process(processA).keyBy(fooKeyFunction).process(processB).keyBy(fooKeyFunction).process(processC).keyBy(fooKeyFunction).process(processE).keyBy(fooKeyFunction).sink(fooSink).

processFunctionC 非常耗时,在最坏的情况下需要 30 秒才能完成。这会导致背压。我尝试为 processFunctionC 分配更多插槽,但我的吞吐量从未保持不变。它主要保持<每秒4条消息。

每个 processFunction 的当前插槽是

processFunctionA: 3
processFunctionB: 30
processFunctionc: 80
processFunctionD: 10
processFunctionC: 10

在 Flink UI 中,它显示从 processB 开始的背压,这意味着 C 非常慢。有没有办法在源本身使用应用分区逻辑并将每个任务的相同插槽分配给每个 processFunction。例如:

dataSoruce.magicKeyBy(fooKeyFunction).setParallelism(80).process(processA).process(processB).process(processC).process(processE).sink(fooSink).

这将导致仅在少数任务中发生背压,并且不会扭曲由多个 KeyBy 引起的背压。

我能想到的另一种方法是将我的所有 processFunction 和 sink 组合成单个 processFunction 并将所有这些逻辑应用到 sink 本身中。

4

1 回答 1

1

我不认为有这样的事情存在。最接近的是DataStreamUtils.reinterpretAsKeyedStream,它在不实际在操作员之间发送任何数据的情况下重新创建 ,KeyedStream因为它使用仅在本地转发数据的分区程序。这或多或少是您想要的,但它仍然添加了分区运算符,并且在后台重新创建了KeyedStream,但它应该更简单、更快,也许它会解决您面临的问题。

如果这不能解决问题,那么我认为最好的解决方案是对操作员进行分组,以便按照您的建议最小化背压,即将所有操作员合并为一个更大的操作员,这应该最大限度地减少背压。

于 2020-06-12T23:28:21.210 回答