我正在开发一个流式数据流管道,该管道使用来自 PubSub 的批处理项目的消息并最终将它们写入数据存储区。为了更好的并行性,也为了及时确认从 PubSub 中提取的消息,我将批次解压缩为单独的项目,并在其后添加一个融合断路器。
所以管道看起来像这样......
PubSubIO -> 反序列化 -> 解包 -> 融合中断 -> 验证/转换 -> DatastoreIO。
这是我的融合断路器,主要是从JdbcIO 类中复制的。它使用触发器来分解全局窗口中的数据。
public class BreakFusionTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {
@Override
public PCollection<T> expand(PCollection<T> input) {
return input
.apply(ParDo.of(new RandomKeyFn<T>()))
.apply(Window.<KV<Integer, T>>triggering(
Repeatedly.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(2L))))
.discardingFiredPanes())
.apply(GroupByKey.<Integer, T>create())
.apply(Values.<Iterable<T>>create())
.apply(Flatten.<T>iterables());
}
private static class RandomKeyFn<T> extends DoFn<T, KV<Integer, T>> {
private Random random;
@Setup
public void setup() {
random = new Random();
}
@ProcessElement
public void processElement(ProcessContext context) {
context.output(KV.of(random.nextInt(), context.element()));
}
}
}
它大部分时间都在工作,除了在某些情况下它生成的输出数量少于输入数量,即使在流输入完成并且管道空闲十分钟之后也是如此。
如下面的 Dataflow Job 监控控制台所示。屏幕截图是在作业耗尽后拍摄的,在我等待大约 10 分钟以使数据从转换中出来之后。
*有人能想出一个解释吗?感觉好像融合破坏者正在阻止或丢失了一些物品。*
我注意到它只发生在数据量/数据速率很高的情况下,迫使管道在测试运行过程中扩大规模,从 25 个 n1-highmem-2 工作人员翻倍到 50 个。但是,我还没有做足够的测试来验证放大是否是重现此问题的关键。
或者触发器可能每两秒一次过于频繁地触发?
我正在使用数据流 2.0.0-beta1。作业 ID 为“2017-02-23_23_15_34-14025424484787508627”。