我已经使用 S3 连接器几个星期了,我想更改连接器命名每个文件的方式。我正在使用 HourlyBasedPartition,因此每个文件的路径已经足以让我找到每个文件,并且我希望文件名对于所有文件都是通用的,例如“Data.json.gzip”(具有相应的路径从分区器)。
例如,我想从这个开始:
<prefix>/<topic>/<HourlyBasedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
对此:
<prefix>/<topic>/<HourlyBasedPartition>/Data.<format>
这样做的目的是只调用一次 S3 以稍后下载文件,而不必先查找文件名然后下载。
从名为“kafka-connect-s3”的文件夹中搜索文件,我找到了这个文件: https ://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src /main/java/io/confluent/connect/s3/TopicPartitionWriter.java最后具有以下一些功能:
private RecordWriter getWriter(SinkRecord record, String encodedPartition)
throws ConnectException {
if (writers.containsKey(encodedPartition)) {
return writers.get(encodedPartition);
}
String commitFilename = getCommitFilename(encodedPartition);
log.debug(
"Creating new writer encodedPartition='{}' filename='{}'",
encodedPartition,
commitFilename
);
RecordWriter writer = writerProvider.getRecordWriter(connectorConfig, commitFilename);
writers.put(encodedPartition, writer);
return writer;
}
private String getCommitFilename(String encodedPartition) {
String commitFile;
if (commitFiles.containsKey(encodedPartition)) {
commitFile = commitFiles.get(encodedPartition);
} else {
long startOffset = startOffsets.get(encodedPartition);
String prefix = getDirectoryPrefix(encodedPartition);
commitFile = fileKeyToCommit(prefix, startOffset);
commitFiles.put(encodedPartition, commitFile);
}
return commitFile;
}
private String fileKey(String topicsPrefix, String keyPrefix, String name) {
String suffix = keyPrefix + dirDelim + name;
return StringUtils.isNotBlank(topicsPrefix)
? topicsPrefix + dirDelim + suffix
: suffix;
}
private String fileKeyToCommit(String dirPrefix, long startOffset) {
String name = tp.topic()
+ fileDelim
+ tp.partition()
+ fileDelim
+ String.format(zeroPadOffsetFormat, startOffset)
+ extension;
return fileKey(topicsDir, dirPrefix, name);
}
我不知道这是否可以根据我想要做的定制,但似乎与我的意图接近/相关。希望能帮助到你。
(也向 Github 提交了一个问题:https ://github.com/confluentinc/kafka-connect-storage-cloud/issues/369 )