我的数据流从 gcs 读取文件并写入另一个 gcs 存储桶。
在我将 sdk 版本从 2.25.0 更新到 2.34 之前,它运行良好。
2.25 版
当输出文件不存在时,数据流将创建它。
当输出文件已经存在时,数据流将更新它。
2.34 版
当输出文件不存在时,数据流将创建它。
当输出文件已经存在时,数据流将不做任何事情,并且不会在日志中出错。
ver2.25 的行为是我想要的。
我怎么能用ver2.34做到这一点?
我的代码写入 gcs 文件如下。
xxPcollection.apply("XXX"
TextIO.write().withTempDirectory(ValueProvider.NestedValueProvider
.of(options.getTempDir(), new SerializableFunction<String, ResourceId>() {
private static final long serialVersionUID = -8758915126650660917L;
@Override
public ResourceId apply(String file) {
return FileBasedSink.convertToFileResourceIfPossible(file);
}
})).to(options.getOutput()).withoutSharding().withSuffix("csv");
我的绒球
<!-- Adds a dependency on the Beam SDK. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.34.0</version>
</dependency>
<!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>2.34.0</version>
</dependency>