0

在使用 Dataflow 运行程序运行时,使用 AfterPane.elementCountAtLeast 触发器不起作用,但在本地运行时可以正常工作。在 Dataflow 上运行时,它只生成一个窗格。

目标是从 Cloud SQL 中提取数据,转换并写入 Cloud Storage。但是,内存中存储的数据过多,因此需要将其拆分并分块写入 Cloud Storage。这就是我希望这样做的。

完整的代码是:

      val pipe = sc.jdbcSelect(getReadOptions(connOptions, stmt))
        .applyTransform(ParDo.of(new Translator()))
        .map(row => row.mkString("|"))
        // produce one global window with one pane per ~500 records
        .withGlobalWindow(WindowOptions(
          trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(500)),
          accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
        ))

      val out = TextIO
        .write()
        .to("gs://test-bucket/staging")
        .withSuffix(".txt")
        .withNumShards(1)
        .withShardNameTemplate("-P-S")
        .withWindowedWrites() // gets us one file per window & pane
      pipe.saveAsCustomOutput("writer",out)

我认为问题的根源可能是JdbcIO该类被实现为 aPTransform<PBegin,PCollection>并且对 processElement 的单个调用输出了整个 SQL 查询结果:

    public void processElement(ProcessContext context) throws Exception {
      try (PreparedStatement statement =
          connection.prepareStatement(
              query.get(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
        statement.setFetchSize(fetchSize);
        parameterSetter.setParameters(context.element(), statement);
        try (ResultSet resultSet = statement.executeQuery()) {
          while (resultSet.next()) {
            context.output(rowMapper.mapRow(resultSet));
          }
        }
      }
    }
4

2 回答 2

1

最后,我有两个问题要解决:1.进程会耗尽内存,2.数据写入单个文件。

Beam 的 JdbcIO 和 Cloud SQL 无法解决问题 1,因为它使用 MySQL 驱动程序的方式。驱动程序本身在一次调用中加载整个结果executeStatement。有一种方法可以让驱动程序流式传输结果,但我必须实现自己的代码才能做到这一点。具体来说,我为 JDBC 实现了一个 BoundedSource。

对于第二个问题,我使用行号来设置每个元素的时间戳。这允许我使用 . 显式控制每个窗口中有多少行FixedWindows

于 2020-01-15T13:43:59.183 回答
0

elementCountAtLeast是一个下限,因此只制作一个窗格对于跑步者来说是一个有效的选择。

为批处理管道执行此操作时,您有几个选项:

  1. 让运行程序决定文件有多大以及写入多少分片:
val pipe = sc.jdbcSelect(getReadOptions(connOptions, stmt))
        .applyTransform(ParDo.of(new Translator()))
        .map(row => row.mkString("|"))

      val out = TextIO
        .write()
        .to("gs://test-bucket/staging")
        .withSuffix(".txt")
      pipe.saveAsCustomOutput("writer",out)

当TextIO具有GroupByKey或支持其前面的拆分的源时,这通常是最快的选项。据我所知,JDBC 不支持拆分,因此您最好的选择是在 jdbcSelect 之后添加Reshuffle将在从数据库读取数据后启用并行处理。

  1. 使用GroupIntoBatches转换手动分组。
val pipe = sc.jdbcSelect(getReadOptions(connOptions, stmt))
        .applyTransform(ParDo.of(new Translator()))
        .map(row => row.mkString("|"))
        .apply(GroupIntoBatches.ofSize(500))

      val out = TextIO
        .write()
        .to("gs://test-bucket/staging")
        .withSuffix(".txt")
        .withNumShards(1)
      pipe.saveAsCustomOutput("writer",out)

一般来说,这会比选项#1 慢,但它确实允许您选择每个文件写入多少记录。

还有其他几种方法可以利用它们的优缺点来做到这一点,但以上两种方法可能最接近你想要的。如果您在问题中添加更多详细信息,我可能会进一步修改此问题。

于 2020-01-09T18:52:11.213 回答