据我所知,没有使用默认 BucketingSink 添加后缀的选项。
一种选择是不使用检查点并将挂起的后缀设置为所需的后缀。但是由于在大多数情况下都需要检查点,所以这不是最佳选择。
我的解决方案是创建一个 BucketingSinkWithSuffix 实现,它几乎是默认BucketingSink的精确副本。唯一需要更改的是为可以在构造函数中设置的后缀添加一个成员变量,并调整创建基本路径的方式。
这是我对构造函数的实现:
public BucketingSinkWithSuffix(String basePath, String suffix) {
this.basePath = basePath;
this.bucketer = new DateTimeBucketer<>();
this.writerTemplate = new StringWriter<>();
this.partSuffix = suffix;
}
并用于生成基本路径(第 523 和 528 行):
partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter + partSuffix);