我正在使用队列(Apache Pulsar)中的跟踪日志。我使用 5 keyedPrcoessFunction 并最终将有效负载下沉到 Postgres Db。我需要为每个 keyedProcessFunction 订购每个 customerId。现在我通过
Datasource.keyBy(fooKeyFunction).process(processA).keyBy(fooKeyFunction).process(processB).keyBy(fooKeyFunction).process(processC).keyBy(fooKeyFunction).process(processE).keyBy(fooKeyFunction).sink(fooSink).
processFunctionC 非常耗时,在最坏的情况下需要 30 秒才能完成。这会导致背压。我尝试为 processFunctionC 分配更多插槽,但我的吞吐量从未保持不变。它主要保持<每秒4条消息。
每个 processFunction 的当前插槽是
processFunctionA: 3
processFunctionB: 30
processFunctionc: 80
processFunctionD: 10
processFunctionC: 10
在 Flink UI 中,它显示从 processB 开始的背压,这意味着 C 非常慢。有没有办法在源本身使用应用分区逻辑并将每个任务的相同插槽分配给每个 processFunction。例如:
dataSoruce.magicKeyBy(fooKeyFunction).setParallelism(80).process(processA).process(processB).process(processC).process(processE).sink(fooSink).
这将导致仅在少数任务中发生背压,并且不会扭曲由多个 KeyBy 引起的背压。
我能想到的另一种方法是将我的所有 processFunction 和 sink 组合成单个 processFunction 并将所有这些逻辑应用到 sink 本身中。