2

我在通过spark-redshift 库java.io.IOException: s3n://bucket-name : 400 : Bad Request error加载 Redshift 数据时面临:

Redshift 集群和 s3 存储桶都在孟买地区

这是完整的错误堆栈:

2017-01-13 13:14:22 WARN  TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, master): java.io.IOException: s3n://bucket-name : 400 : Bad Request
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:453)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:427)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleException(Jets3tNativeFileSystemStore.java:411)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:181)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
            at org.apache.hadoop.fs.s3native.$Proxy10.retrieveMetadata(Unknown Source)
            at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:476)
            at com.databricks.spark.redshift.RedshiftRecordReader.initialize(RedshiftInputFormat.scala:115)
            at com.databricks.spark.redshift.RedshiftFileFormat$$anonfun$buildReader$1.apply(RedshiftFileFormat.scala:92)
            at com.databricks.spark.redshift.RedshiftFileFormat$$anonfun$buildReader$1.apply(RedshiftFileFormat.scala:80)
            at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(fileSourceInterfaces.scala:279)
            at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(fileSourceInterfaces.scala:263)
            at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116)
            at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
            at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
            at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
            at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
            at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
            at org.apache.spark.scheduler.Task.run(Task.scala:86)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
    Caused by: org.jets3t.service.impl.rest.HttpException: 400 Bad Request
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:425)
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:279)
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1052)
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2264)
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2193)
            at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120)
            at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174)
            ... 30 more

这是我的Java代码:

SparkContext sparkContext = SparkSession.builder().appName("CreditModeling").getOrCreate().sparkContext();
sparkContext.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
sparkContext.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", fs_s3a_awsAccessKeyId);
sparkContext.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey", fs_s3a_awsSecretAccessKey);
sparkContext.hadoopConfiguration().set("fs.s3a.endpoint", "s3.ap-south-1.amazonaws.com");

SQLContext sqlContext=new SQLContext(sparkContext);
Dataset dataset= sqlContext
        .read()
        .format("com.databricks.spark.redshift")
        .option("url", redshiftUrl)
        .option("query", query)
        .option("aws_iam_role", aws_iam_role)
        .option("tempdir", "s3a://bucket-name/temp-dir")
        .load();

通过进行以下更改(参考this ) ,我能够解决火花本地模式下的问题:

1) 我已将 jets3t jar 更换为 0.9.4

2) 更改了 jets3t 配置属性以支持 aws4 版本存储桶,如下所示:

Jets3tProperties myProperties = Jets3tProperties.getInstance(Constants.JETS3T_PROPERTIES_FILENAME);
myProperties.setProperty("s3service.s3-endpoint", "s3.ap-south-1.amazonaws.com");
myProperties.setProperty("storage-service.request-signature-version", "AWS4-HMAC-SHA256");
myProperties.setProperty("uploads.stream-retry-buffer-size", "2147483646");

但现在我试图在集群模式下运行作业(火花独立模式使用资源管理器 MESOS)并且错误再次出现:(

任何帮助,将不胜感激!

4

2 回答 2

3

实际问题:

更新 Jets3tProperties 以支持 AWS s3 签名版本 4,在运行时在本地模式下工作,但不在集群模式下工作,因为属性仅在驱动程序 JVM 上更新,而不在任何执行程序 JVM 上更新。

解决方案:

通过参考链接,我找到了一种解决方法来更新所有执行程序上的 Jets3tProperties。

通过参考上面的链接,我在 .foreachPartition() 函数中添加了一个额外的代码片段来更新 Jets3tProperties,该函数将为在任何执行程序上创建的第一个分区运行它。

这是代码:

 Dataset dataset= sqlContext
            .read()
            .format("com.databricks.spark.redshift")
            .option("url", redshiftUrl)
            .option("query", query)
            .option("aws_iam_role", aws_iam_role)
            .option("tempdir", "s3a://bucket-name/temp-dir")
            .load();

dataset.foreachPartition(rdd -> {
    boolean first=true;
    if(first){
        Jets3tProperties myProperties =
                Jets3tProperties.getInstance(Constants.JETS3T_PROPERTIES_FILENAME);
        myProperties.setProperty("s3service.s3-endpoint", "s3.ap-south-1.amazonaws.com");
        myProperties
                .setProperty("storage-service.request-signature-version", "AWS4-HMAC-SHA256");
        myProperties.setProperty("uploads.stream-retry-buffer-size", "2147483646");
        first = false;
    }
});
于 2017-01-26T15:53:13.787 回答
1

该堆栈意味着您正在使用基于 jets3t 的较旧的 s3n 连接器。您正在设置仅适用于较新的 S3a 的权限。使用像 s3a:// 这样的 URL 来获取新条目。

鉴于您尝试使用 V4 API,您还需要设置 fs.s3a.endpoint。400/bad-request 响应是您尝试使用 v4 对中央端点进行身份验证时看到的响应

于 2017-01-15T15:12:27.900 回答