0

我正在尝试使用 Java SDK 使用 S3 选择从 S3 中的对象中扫描并获取大量数据。我正在使用scanRange类来提供 S3 对象的startend字节范围。s3 对象是一个 9MB 的 parquet 文件,未压缩大小为 84MB。

scanRange在获取没有选项的 s3 对象时SelectObjectContentRequest,我得到了一个 84MB 的文件,如预期的那样。

但是,scanRange即使在使用时,我也得到了整个 84MB 文件,无论我提供了多少字节。startend

以下是我正在使用的代码片段:

public class S3SelectParquetParser {
  public Object getS3SelectResponse(AmazonS3 s3Client, S3SelectParserRequest s3SelectParserRequest) throws IOException {
    SelectObjectContentRequest selectRequest = gets3SelectRequestObject(s3SelectParserRequest);
    SelectObjectContentResult result = s3Client.selectObjectContent(selectRequest);
    return massageResult(result.getPayload(), s3SelectParserRequest.getOutputFile());
  }

  private SelectObjectContentRequest gets3SelectRequestObject(S3SelectParserRequest s3SelectParserRequest) {
    SelectObjectContentRequest requestObject = new SelectObjectContentRequest();

    InputSerialization inputSerialization = new InputSerialization()
        .withParquet(new ParquetInput())
        .withCompressionType(CompressionType.NONE);

    OutputSerialization outputSerialization = new OutputSerialization()
        .withJson(new JSONOutput());

    RequestProgress requestProgress = new RequestProgress();
    if (s3SelectParserRequest.isShowProgress()) {
      requestProgress.setEnabled(true);
    } else {
      requestProgress.setEnabled(false);
    }

    requestObject.setBucketName(s3SelectParserRequest.getBucketName());
    requestObject.setKey(s3SelectParserRequest.getObjectPath());
    requestObject.setInputSerialization(inputSerialization);
    requestObject.setOutputSerialization(outputSerialization);
    requestObject.setExpressionType(ExpressionType.SQL);
    requestObject.setExpression(s3SelectParserRequest.getQuery());
    requestObject.setRequestProgress(requestProgress);

    if (s3SelectParserRequest.isRangedRequest()) {
      ScanRange scanRange = new ScanRange();
      scanRange.setStart(s3SelectParserRequest.getStartByteRange());
      scanRange.setEnd(s3SelectParserRequest.getEndByteRange());
      requestObject.setScanRange(scanRange);
    }
    return requestObject;
  }


  private Object massageResult(SelectObjectContentEventStream payload, String outputFile) throws IOException {
    log.info("Starting reading the inputStream");
    final AtomicBoolean isResultComplete = new AtomicBoolean(false);
    OutputStream outputStream = null;
    try {
      outputStream = new FileOutputStream(outputFile);
    } catch (Exception e) {
      log.error("Exception occurred", e);
    }
    InputStream resultInputStream = payload.getRecordsInputStream(
        new SelectObjectContentEventVisitor() {
          @Override
          public void visit(SelectObjectContentEvent.StatsEvent event) {
            System.out.println(
                "Received Stats, Bytes Scanned: " + event.getDetails().getBytesScanned()
                    + " Bytes Processed: " + event.getDetails().getBytesProcessed());
          }

          /*
           * An End Event informs that the request has finished successfully.
           */
          @Override
          public void visit(SelectObjectContentEvent.EndEvent event) {
            isResultComplete.set(true);
            System.out.println("Received End Event. Result is complete.");
          }

          @Override
          public void visit(ProgressEvent event) {
            log.info("{Progress}: Bytes processed: %s \t Bytes received: %s", event.getDetails().getBytesProcessed(),
                event.getDetails().getBytesReturned());
          }

        }
    );
    copy(resultInputStream, outputStream);
    return resultInputStream;
  }
}

其中S3SelectParserRequest初始化为:

  S3SelectParserRequest s3SelectRequestRanged = S3SelectParserRequest.builder()
        .bucketName(bucketName)
        .objectPath(bucketPath)
        .query(fullQuery)
        .showProgress(false)
        .isRangedRequest(true)
        .startByteRange(0)
        .endByteRange(4098)
        .outputFile("ranged-0-400.json")
        .build();

并且getS3SelectResponse被称为:

new S3SelectParquetParser().getS3SelectResponse(s3Client, s3SelectRequestRanged);
4

1 回答 1

0

我想到了。scanRangeS3 select 为parquet 对象提供的字节中包含的所有行组提供数据。最初的 4 个字节包含标题,并且在 parquet 对象的第 5 个字节之前包含第一个行组。由于我的第一个行组在第 0 个字节(我提供的startByterange)之后,无论endByteRange我在 4 之后提供什么,它都会返回第一个行组(如果它们从我指定的范围开始,也会返回其他行组)。我的镶木地板文件只包含一个行组,因此无论提供什么数据范围,它都提供了整个文件,即第一行组。

于 2020-11-26T05:55:11.663 回答