0

我正在开发一个流式数据流管道,该管道使用来自 PubSub 的批处理项目的消息并最终将它们写入数据存储区。为了更好的并行性,也为了及时确认从 PubSub 中提取的消息,我将批次解压缩为单独的项目,并在其后添加一个融合断路器。

所以管道看起来像这样......

PubSubIO -> 反序列化 -> 解包 -> 融合中断 -> 验证/转换 -> DatastoreIO。

这是我的融合断路器,主要是从JdbcIO 类中复制的。它使用触发器来分解全局窗口中的数据。

public class BreakFusionTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {

  @Override
  public PCollection<T> expand(PCollection<T> input) {
    return input
        .apply(ParDo.of(new RandomKeyFn<T>()))
        .apply(Window.<KV<Integer, T>>triggering(
            Repeatedly.forever(
                AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(2L))))
            .discardingFiredPanes())
        .apply(GroupByKey.<Integer, T>create())
        .apply(Values.<Iterable<T>>create())
        .apply(Flatten.<T>iterables());
  }

  private static class RandomKeyFn<T> extends DoFn<T, KV<Integer, T>> {
    private Random random;

    @Setup
    public void setup() {
      random = new Random();
    }

    @ProcessElement
    public void processElement(ProcessContext context) {
      context.output(KV.of(random.nextInt(), context.element()));
    }
  }
}

它大部分时间都在工作,除了在某些情况下它生成的输出数量少于输入数量,即使在流输入完成并且管道空闲十分钟之后也是如此。

如下面的 Dataflow Job 监控控制台所示。屏幕截图是在作业耗尽后拍摄的,在我等待大约 10 分钟以使数据从转换中出来之后。

在此处输入图像描述

*有人能想出一个解释吗?感觉好像融合破坏者正在阻止或丢失了一些物品。*

我注意到它只发生在数据量/数据速率很高的情况下,迫使管道在测试运行过程中扩大规模,从 25 个 n1-highmem-2 工作人员翻倍到 50 个。但是,我还没有做足够的测试来验证放大是否是重现此问题的关键。

或者触发器可能每两秒一次过于频繁地触发?

我正在使用数据流 2.0.0-beta1。作业 ID 为“2017-02-23_23_15_34-14025424484787508627”。

4

1 回答 1

1

Streaming Dataflow 中的计数器是尽力而为的措施;特别是自动缩放可能会导致更大的差异。在这种情况下,管道不应丢失数据。

于 2017-02-27T22:43:55.910 回答