0

我的数据流从 gcs 读取文件并写入另一个 gcs 存储桶。

在我将 sdk 版本从 2.25.0 更新到 2.34 之前,它运行良好。

2.25 版
当输出文件不存在时,数据流将创建它。
当输出文件已经存在时,数据流将更新它。

2.34 版
当输出文件不存在时,数据流将创建它。
当输出文件已经存在时,数据流将不做任何事情,并且不会在日志中出错。

ver2.25 的行为是我想要的。
我怎么能用ver2.34做到这一点?

我的代码写入 gcs 文件如下。

xxPcollection.apply("XXX"
        TextIO.write().withTempDirectory(ValueProvider.NestedValueProvider
                .of(options.getTempDir(), new SerializableFunction<String, ResourceId>() {
                    private static final long serialVersionUID = -8758915126650660917L;

                    @Override
                    public ResourceId apply(String file) {
                        return FileBasedSink.convertToFileResourceIfPossible(file);
                    }
                })).to(options.getOutput()).withoutSharding().withSuffix("csv");

我的绒球

<!-- Adds a dependency on the Beam SDK. -->
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>2.34.0</version>
</dependency>

<!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>2.34.0</version>
</dependency>
4

0 回答 0