0

我使用 Apache Flink 在 HDFS 上创建了一些存档数据文件,生成的文件名具有类似 part-{parallel-task}-{count} 的模式,但我期望应该有“.gz”后缀,可以由 Apache Spark 直接加载。

我找不到任何 API 来为 Apache Flink 中 BucketingSink 生成的最终完成文件添加后缀,但只能为 InProgress、Pending 和 ValidLength 状态添加后缀。任何人都可以帮忙吗? HDFS 连接器Java API

4

1 回答 1

0

据我所知,没有使用默认 BucketingSink 添加后缀的选项。

一种选择是不使用检查点并将挂起的后缀设置为所需的后缀。但是由于在大多数情况下都需要检查点,所以这不是最佳选择。

我的解决方案是创建一个 BucketingSinkWithSuffix 实现,它几乎是默认BucketingSink的精确副本。唯一需要更改的是为可以在构造函数中设置的后缀添加一个成员变量,并调整创建基本路径的方式。

这是我对构造函数的实现:

    public BucketingSinkWithSuffix(String basePath, String suffix) {
    this.basePath = basePath;
    this.bucketer = new DateTimeBucketer<>();
    this.writerTemplate = new StringWriter<>();
    this.partSuffix = suffix;
}

并用于生成基本路径(第 523 和 528 行):

partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter + partSuffix);
于 2018-01-29T10:18:32.050 回答