2

我们有一些代码可以创建和使用本地 spark 并将 parquet 文件写入 S3。它适用于 Amazon S3 和 IBM Cloud Object Storage。但是当我建立一个 minIO 容器并将代码指向那里时,它会失败并出现如下错误:

org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:944)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:380)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:829)
    at com.ibm.fhir.jbatch.bulkdata.export.common.SparkParquetWriter.writeParquet(SparkParquetWriter.java:102)
    at com.ibm.fhir.bulkcommon.SparkParquetWriterTest.testWriteCOSviaHMAC(SparkParquetWriterTest.java:87)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
    at org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
    at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
    at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
    at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
    at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
    at org.testng.TestRunner.privateRun(TestRunner.java:648)
    at org.testng.TestRunner.run(TestRunner.java:505)
    at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
    at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
    at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
    at org.testng.SuiteRunner.run(SuiteRunner.java:364)
    at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
    at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
    at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208)
    at org.testng.TestNG.runSuitesLocally(TestNG.java:1137)
    at org.testng.TestNG.runSuites(TestNG.java:1049)
    at org.testng.TestNG.run(TestNG.java:1017)
    at org.testng.remote.AbstractRemoteTestNG.run(AbstractRemoteTestNG.java:115)
    at org.testng.remote.RemoteTestNG.initAndRun(RemoteTestNG.java:251)
    at org.testng.remote.RemoteTestNG.main(RemoteTestNG.java:77)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, 192.168.0.107, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: saving output test.parquet/part-00000-4f28f646-e457-4a0c-a4fd-3c3242180b72-c000-attempt_20200730040459_0001_m_000000_1.snappy.parquet com.amazonaws.services.s3.model.AmazonS3Exception: Object-prefix is already an object, please choose a different object-prefix name. (Service: Amazon S3; Status Code: 400; Error Code: XMinioParentIsObject; Request ID: 1626792118789256; S3 Extended Request ID: 4f62aa42-7879-4a61-bb0f-4720577a9dd5), S3 Extended Request ID: 4f62aa42-7879-4a61-bb0f-4720577a9dd5
    at com.ibm.stocator.fs.cos.COSOutputStream.close(COSOutputStream.java:196)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
    at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
    at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:685)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:122)
    at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:58)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:75)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:275)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
    ... 9 more

据我所知,spark/hadoop/stocator 正在编写一个名为的空对象test.parquet,然后编写一个逻辑上“低于”该对象的 parquet 对象(模拟文件系统层次结构)。不幸的是,minIO 开发人员非常坚持不支持,因为 minIO 由文件系统支持,并且它们将它们的键映射到真实的文件系统路径(因此空test.parquet目录对象阻止他们成功创建同名的目录来放置分区)。

那么,还有其他方法可以通过 spark 将 parquet 写入 minIO 吗?!有没有办法告诉它不要创建那个空文件(或者强制它/最后有一个我认为可能适用于minio的文件?)

写入的示例代码片段:

SparkSession spark = SparkSession.builder()
        .appName("parquetWriter")
        .master("local[*]")
        .config("spark.ui.enabled", false)
        .config("fs.cos.impl", "com.ibm.stocator.fs.ObjectStoreFileSystem")
        .config("fs.stocator.scheme.list", "cos")
        .config("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient")
        .config("fs.stocator.cos.scheme", "cos")
        .config("fs.cos.service.endpoint", ENDPOINT)
        .config("fs.cos.service.access.key", ACCESS_KEY)
        .config("fs.cos.service.secret.key", SECRET_KEY)
        .getOrCreate();

List<String> json = Collections.singletonList("{\"a\":1}");

String itemName = "cos://mybucket.service/test.parquet/";
Dataset<String> jDataset = spark.createDataset(json, Encoders.STRING());
Dataset<?> jsonDF = spark.read().json(jDataset);
jsonDF.coalesce(1).write().mode("append").parquet(itemName);
4

0 回答 0