我正在运行 LocalStack 并尝试使用下面的代码将 DataFrame 写入 S3。
val spark = SparkSession
.builder()
.appName("LocalStack Test")
.master("local[*]")
.config("spark.hadoop.fs.s3a.endpoint", "http://0.0.0.0:4572")
.config("fs.s3a.path.style.access", "true")
.getOrCreate()
val df = spark.sqlContext.read
.option("header", "true")
.option("inferSchema", "true")
.csv("test.csv")
df.write
.mode(SaveMode.Overwrite)
.save(s"s3a://test/test2.csv")
这会引发以下异常:
Caused by: com.amazonaws.SdkClientException: Unable to verify integrity of data upload. Client calculated content hash (contentMD5: 1B2M2Y8AsgTpgAmY7PhCfg== in base 64) didn't match hash (etag: c20aef10d728c21878be739244ab1080 in hex) calculated by Amazon S3. You may need to delete the data stored in Amazon S3. (metadata.contentMD5: null, md5DigestStream: com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream@2a6b3577, bucketName: spark, key: test/_temporary/0/)
似乎这是一个最近解决的已知问题,但 Spark 仍然存在问题。创建 SparkSession 时是否需要设置任何其他配置选项?