1

我正在运行 LocalStack 并尝试使用下面的代码将 DataFrame 写入 S3。

val spark = SparkSession
    .builder()
    .appName("LocalStack Test")
    .master("local[*]")
    .config("spark.hadoop.fs.s3a.endpoint", "http://0.0.0.0:4572")
    .config("fs.s3a.path.style.access", "true")
    .getOrCreate()

val df = spark.sqlContext.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("test.csv")

df.write
    .mode(SaveMode.Overwrite)
    .save(s"s3a://test/test2.csv")

这会引发以下异常:

Caused by: com.amazonaws.SdkClientException: Unable to verify integrity of data upload.  Client calculated content hash (contentMD5: 1B2M2Y8AsgTpgAmY7PhCfg== in base 64) didn't match hash (etag: c20aef10d728c21878be739244ab1080 in hex) calculated by Amazon S3.  You may need to delete the data stored in Amazon S3. (metadata.contentMD5: null, md5DigestStream: com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream@2a6b3577, bucketName: spark, key: test/_temporary/0/)

似乎这是一个最近解决的已知问题,但 Spark 仍然存在问题。创建 SparkSession 时是否需要设置任何其他配置选项?

4

1 回答 1

1

“spark.hadoop.fs.s3a.endpoint”设置为奇怪...这是本地 S3 服务器吗?

如果是这样:尝试将 s3a 强制降低到 v2 签名 xml <property> <name>fs.s3a.signing-algorithm</name> <value>AWS3SignerType</value> </property> 我不会做出任何承诺它会起作用,只是已知它会让问题消失“一次”

ps:CSV inferSchema 对 S3 来说真的很昂贵,因为文件将被完全读取以计算模式,然后第二次用于计算。做一次,打印结果,然后从那时起使用该模式。

于 2018-02-14T23:21:19.633 回答