在使用 Dataflow 运行程序运行时,使用 AfterPane.elementCountAtLeast 触发器不起作用,但在本地运行时可以正常工作。在 Dataflow 上运行时,它只生成一个窗格。
目标是从 Cloud SQL 中提取数据,转换并写入 Cloud Storage。但是,内存中存储的数据过多,因此需要将其拆分并分块写入 Cloud Storage。这就是我希望这样做的。
完整的代码是:
val pipe = sc.jdbcSelect(getReadOptions(connOptions, stmt))
.applyTransform(ParDo.of(new Translator()))
.map(row => row.mkString("|"))
// produce one global window with one pane per ~500 records
.withGlobalWindow(WindowOptions(
trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(500)),
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
))
val out = TextIO
.write()
.to("gs://test-bucket/staging")
.withSuffix(".txt")
.withNumShards(1)
.withShardNameTemplate("-P-S")
.withWindowedWrites() // gets us one file per window & pane
pipe.saveAsCustomOutput("writer",out)
我认为问题的根源可能是JdbcIO
该类被实现为 aPTransform<PBegin,PCollection>
并且对 processElement 的单个调用输出了整个 SQL 查询结果:
public void processElement(ProcessContext context) throws Exception {
try (PreparedStatement statement =
connection.prepareStatement(
query.get(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
statement.setFetchSize(fetchSize);
parameterSetter.setParameters(context.element(), statement);
try (ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
context.output(rowMapper.mapRow(resultSet));
}
}
}
}