我正在尝试使用 Java SDK 使用 S3 选择从 S3 中的对象中扫描并获取大量数据。我正在使用scanRange
类来提供 S3 对象的start
和end
字节范围。s3 对象是一个 9MB 的 parquet 文件,未压缩大小为 84MB。
scanRange
在获取没有选项的 s3 对象时SelectObjectContentRequest
,我得到了一个 84MB 的文件,如预期的那样。
但是,scanRange
即使在使用时,我也得到了整个 84MB 文件,无论我提供了多少字节。start
end
以下是我正在使用的代码片段:
public class S3SelectParquetParser {
public Object getS3SelectResponse(AmazonS3 s3Client, S3SelectParserRequest s3SelectParserRequest) throws IOException {
SelectObjectContentRequest selectRequest = gets3SelectRequestObject(s3SelectParserRequest);
SelectObjectContentResult result = s3Client.selectObjectContent(selectRequest);
return massageResult(result.getPayload(), s3SelectParserRequest.getOutputFile());
}
private SelectObjectContentRequest gets3SelectRequestObject(S3SelectParserRequest s3SelectParserRequest) {
SelectObjectContentRequest requestObject = new SelectObjectContentRequest();
InputSerialization inputSerialization = new InputSerialization()
.withParquet(new ParquetInput())
.withCompressionType(CompressionType.NONE);
OutputSerialization outputSerialization = new OutputSerialization()
.withJson(new JSONOutput());
RequestProgress requestProgress = new RequestProgress();
if (s3SelectParserRequest.isShowProgress()) {
requestProgress.setEnabled(true);
} else {
requestProgress.setEnabled(false);
}
requestObject.setBucketName(s3SelectParserRequest.getBucketName());
requestObject.setKey(s3SelectParserRequest.getObjectPath());
requestObject.setInputSerialization(inputSerialization);
requestObject.setOutputSerialization(outputSerialization);
requestObject.setExpressionType(ExpressionType.SQL);
requestObject.setExpression(s3SelectParserRequest.getQuery());
requestObject.setRequestProgress(requestProgress);
if (s3SelectParserRequest.isRangedRequest()) {
ScanRange scanRange = new ScanRange();
scanRange.setStart(s3SelectParserRequest.getStartByteRange());
scanRange.setEnd(s3SelectParserRequest.getEndByteRange());
requestObject.setScanRange(scanRange);
}
return requestObject;
}
private Object massageResult(SelectObjectContentEventStream payload, String outputFile) throws IOException {
log.info("Starting reading the inputStream");
final AtomicBoolean isResultComplete = new AtomicBoolean(false);
OutputStream outputStream = null;
try {
outputStream = new FileOutputStream(outputFile);
} catch (Exception e) {
log.error("Exception occurred", e);
}
InputStream resultInputStream = payload.getRecordsInputStream(
new SelectObjectContentEventVisitor() {
@Override
public void visit(SelectObjectContentEvent.StatsEvent event) {
System.out.println(
"Received Stats, Bytes Scanned: " + event.getDetails().getBytesScanned()
+ " Bytes Processed: " + event.getDetails().getBytesProcessed());
}
/*
* An End Event informs that the request has finished successfully.
*/
@Override
public void visit(SelectObjectContentEvent.EndEvent event) {
isResultComplete.set(true);
System.out.println("Received End Event. Result is complete.");
}
@Override
public void visit(ProgressEvent event) {
log.info("{Progress}: Bytes processed: %s \t Bytes received: %s", event.getDetails().getBytesProcessed(),
event.getDetails().getBytesReturned());
}
}
);
copy(resultInputStream, outputStream);
return resultInputStream;
}
}
其中S3SelectParserRequest
初始化为:
S3SelectParserRequest s3SelectRequestRanged = S3SelectParserRequest.builder()
.bucketName(bucketName)
.objectPath(bucketPath)
.query(fullQuery)
.showProgress(false)
.isRangedRequest(true)
.startByteRange(0)
.endByteRange(4098)
.outputFile("ranged-0-400.json")
.build();
并且getS3SelectResponse
被称为:
new S3SelectParquetParser().getS3SelectResponse(s3Client, s3SelectRequestRanged);