下面是我试图用来在 S3 中创建 carbondata 表的代码片段。然而,尽管在 hadoopconfiguration 中设置了 aws 凭证,它仍然抱怨没有设置密钥和访问密钥。这里有什么问题?
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.CarbonSession._
val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("s3n://url")
carbon.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId","<accesskey>")
carbon.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey","<secretaccesskey>")
carbon.sql("CREATE TABLE IF NOT EXISTS test_table(id string,name string,city string,age Int) STORED BY 'carbondata'")
最后一个命令产生错误:
java.lang.IllegalArgumentException:必须将 AWS 访问密钥 ID 和秘密访问密钥指定为 s3n URL 的用户名或密码(分别),或者通过设置 fs.s3n.awsAccessKeyId 或 fs.s3n.awsSecretAccessKey 属性(分别)
Spark Version : 2.2.1
Command used to start spark-shell:
$SPARK_PATH/bin/spark-shell --jars /localpath/jar/apache-carbondata-1.3.1-bin-spark2.2.1-hadoop2.7.2/apache-carbondata-1.3.1-bin-spark2.2.1-hadoop2.7.2.jar,/localpath/jar/spark-avro_2.11-4.0.0.jar --packages com.amazonaws:aws-java-sdk-pom:1.9.22,org.apache.hadoop:hadoop-aws:2.7.2,org.slf4j:slf4j-simple:1.7.21,asm:asm:3.2,org.xerial.snappy:snappy-java:1.1.7.1,com.databricks:spark-avro_2.11:4.0.0
更新:
发现 S3 支持仅在 1.4.0 RC1 中可用。所以我构建了 RC1 并针对相同的代码测试了下面的代码。但我似乎仍然遇到了问题。任何帮助表示赞赏。代码:
import org.apache.spark.sql.CarbonSession._
import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.constants.CarbonCommonConstants
object sample4 {
def main(args: Array[String]) {
val (accessKey, secretKey, endpoint) = getKeyOnPrefix("s3n://")
//val rootPath = new File(this.getClass.getResource("/").getPath
// + "../../../..").getCanonicalPath
val path = "/localpath/sample/data1.csv"
val spark = SparkSession
.builder()
.master("local")
.appName("S3UsingSDKExample")
.config("spark.driver.host", "localhost")
.config(accessKey, "<accesskey>")
.config(secretKey, "<secretkey>")
//.config(endpoint, "s3-us-east-1.amazonaws.com")
.getOrCreateCarbonSession()
spark.sql("Drop table if exists carbon_table")
spark.sql(
s"""
| CREATE TABLE if not exists carbon_table(
| shortField SHORT,
| intField INT,
| bigintField LONG,
| doubleField DOUBLE,
| stringField STRING,
| timestampField TIMESTAMP,
| decimalField DECIMAL(18,2),
| dateField DATE,
| charField CHAR(5),
| floatField FLOAT
| )
| STORED BY 'carbondata'
| LOCATION 's3n://bucketName/table/carbon_table'
| TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField')
""".stripMargin)
}
def getKeyOnPrefix(path: String): (String, String, String) = {
val endPoint = "spark.hadoop." + ENDPOINT
if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint)
} else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) {
("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY,
"spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint)
} else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) {
("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY,
"spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint)
} else {
throw new Exception("Incorrect Store Path")
}
}
def getSparkMaster(args: Array[String]): String = {
if (args.length == 6) args(5)
else if (args(3).contains("spark:") || args(3).contains("mesos:")) args(3)
else "local"
}
}
错误:
18/05/17 12:23:22 ERROR SegmentStatusManager: main Failed to read metadata of load org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.ServiceException: Request Error: Empty key
我还尝试了(也尝试了 s3、s3n、s3a 协议)中的示例代码:
https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala
运行为:
S3Example.main(Array("accesskey","secretKey","s3://bucketName/path/carbon_table"," https://bucketName.s3.amazonaws.com ","local"))
错误堆栈跟踪:
org.apache.hadoop.fs.s3.S3Exception:org.jets3t.service.S3ServiceException:请求错误:org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:175) 处的空键.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221) 在 sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect .Method.invoke(Method.java:498) 在 org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) 在 org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java :102) com.sun.proxy.$Proxy21.retrieveINode(Unknown Source) at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340) 在 org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426) 在 org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.isFileExist(AbstractDFSCarbonFile.java:426) 在 org.apache.carbondata .core.datastore.impl.FileFactory.isFileExist(FileFactory.java:201) 在 org.apache.carbondata.core.statusmanager.SegmentStatusManager.readTableStatusFile(SegmentStatusManager.java:246) 在 org.apache.carbondata.core.statusmanager.SegmentStatusManager .readLoadMetadata(SegmentStatusManager.java:197) 在 org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(ManageDictionaryAndBTree.java:101) 在 org.apache.spark.sql.hive.CarbonFileMetastore.dropTable(CarbonFileMetastore.scala :460) 在 org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand。processMetadata(CarbonCreateTableCommand.scala:148) at org.apache.spark.sql.execution.command.MetadataCommand.run(package.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute( commands.scala:58) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala: 67) 在 org.apache.spark.sql.CarbonSession$$anonfun$sql$1.apply(CarbonSession.scala:107) 在 org.apache.spark.sql.Dataset.(Dataset.scala:183) 在 org.apache。 spark.sql.CarbonSession$$anonfun$sql$1.apply(CarbonSession.scala:96) at org.apache.spark.sql.CarbonSession.withProfiler(CarbonSession.scala:144) at org.apache.spark.sql.CarbonSession。 sql(CarbonSession.scala:94) 在 $line19。$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$S3Example$.main(:68) at $line26.$read$$iw$$iw$$iw $$iw$$iw$$iw$$iw$$iw.(:31) 在 $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(: 36) 在 $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw.(:38) 在 $line26.$read$$iw$$iw$$iw$$iw$ $iw.(:40) 在 $line26.$read$$iw$$iw$$iw$$iw.(:42) 在 $line26.$read$$iw$$iw$$iw.(:44)在 $line26.$read$$iw$$iw.(:46) 在 $line26.$read$$iw.(:48) 在 $line26.$read.(:50) 在 $line26.$read$。 (:54) 在 $line26.$read$.() 在 $line26.$eval$.$print$lzycompute(:7) 在 $line26.$eval$.$print(:6) 在 $line26.$eval .$print() 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786) 在 scala.tools.nsc.interpreter.IMain $Request.loadAndRun(IMain.scala:1047) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638) at scala.tools.nsc.interpreter.IMain$WrappedRequest $$anonfun$loadAndRunReq$1.apply(IMain.scala:637) at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader .scala:19) 在 scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569) 在 scala.tools 的 scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)。 nsc.interpreter.IMain.interpret(IMain.scala:565) 在 scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807) 在 scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681) 在 scala.tools.nsc.interpreter.ILoop .processLine(ILoop.scala:395) at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp (ILoop.scala:923) 在 scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) 在 scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply (ILoop.scala:909) at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97) at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909) at org.apache .spark.repl.Main$.doMain(Main.scala:74) 在 org.apache.spark.repl.Main$.main(Main.scala:54) 在 org.apache.spark.repl.Main。main(Main.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java .lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775) at org.apache .spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 在 org.apache.spark.deploy.SparkSubmit$.main (SparkSubmit.scala:119) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 原因:org.jets3t.service.S3ServiceException:请求错误:org.jets3t.service.S3Service 处的键为空。getObject(S3Service.java:1470) 在 org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:163)
是我传递错误的任何论点。我可以使用 aws cli 访问 s3 路径:
aws s3 ls s3://bucketName/path
存在于 S3 中。