1

我正在使用 Apache Beam 加入多个流以及一些查找。我有 2 个场景,如果查找大小很大,我希望边输入为每个记录处理重新加载/刷新(即我将使用 where 子句查询数据库),如果查找大小较小,则重新加载/刷新一次一天。

我想知道什么是正确的方法。我不希望庞大的数据端输入吃掉所有工人的记忆。

我使用下面的代码每天刷新一次侧输入。

PCollectionView<Map<String, String>> lkp =
        p.apply(GenerateSequence.from(0)).withRate(1, Duration.standardDays(1))
            .apply(
                Window.<Long>into(new GlobalWindows())
                    .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                    .discardingFiredPanes())
            .apply(
                ParDo.of(
                    new DoFn<Long, Map<String, String>>() {

                      private static final long serialVersionUID = 1L;

                      @ProcessElement
                      public void process(
                          @Element Long input, OutputReceiver<Map<String, String>> o) {
                        Map<String, String> map = HiveConnection.getHiveConnection("select * from table");
                        o.output(map);
                      }
                    }))
            .apply(View.<Map<String, String>>asSingleton());

请指导我了解此类用例的最佳实践,并为我提供一些示例代码以便更好地理解。

谢谢, 戈瑟姆

4

1 回答 1

1

您正在使用正确的推荐模式进行小型日常查找。

在大的情况下,DoFn 的标注通常是推荐的模式,而不是使用 SideInput。这个旧博客包含“调用外部服务以丰富数据”模式的示例。

常见 Cloud Dataflow 用例模式指南,第 1 部分

我会尝试找时间将此模式添加到 Beam 模式页面中:

光束模式

于 2019-08-19T02:21:01.060 回答