-1

下面是我试图用来在 S3 中创建 carbondata 表的代码片段。然而,尽管在 hadoopconfiguration 中设置了 aws 凭证,它仍然抱怨没有设置密钥和访问密钥。这里有什么问题?

 import org.apache.spark.sql.CarbonSession._
 import org.apache.spark.sql.CarbonSession._
 val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("s3n://url")
carbon.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId","<accesskey>")
   carbon.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey","<secretaccesskey>")
carbon.sql("CREATE TABLE IF NOT EXISTS test_table(id string,name string,city string,age Int) STORED BY 'carbondata'")

最后一个命令产生错误:

java.lang.IllegalArgumentException:必须将 AWS 访问密钥 ID 和秘密访问密钥指定为 s3n URL 的用户名或密码(分别),或者通过设置 fs.s3n.awsAccessKeyId 或 fs.s3n.awsSecretAccessKey 属性(分别)

Spark Version : 2.2.1
Command used to start spark-shell:
$SPARK_PATH/bin/spark-shell --jars /localpath/jar/apache-carbondata-1.3.1-bin-spark2.2.1-hadoop2.7.2/apache-carbondata-1.3.1-bin-spark2.2.1-hadoop2.7.2.jar,/localpath/jar/spark-avro_2.11-4.0.0.jar --packages com.amazonaws:aws-java-sdk-pom:1.9.22,org.apache.hadoop:hadoop-aws:2.7.2,org.slf4j:slf4j-simple:1.7.21,asm:asm:3.2,org.xerial.snappy:snappy-java:1.1.7.1,com.databricks:spark-avro_2.11:4.0.0

更新:

发现 S3 支持仅在 1.4.0 RC1 中可用。所以我构建了 RC1 并针对相同的代码测试了下面的代码。但我似乎仍然遇到了问题。任何帮助表示赞赏。代码:

import org.apache.spark.sql.CarbonSession._
import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.constants.CarbonCommonConstants
object sample4 {
def main(args: Array[String]) {
val (accessKey, secretKey, endpoint) = getKeyOnPrefix("s3n://")
//val rootPath = new File(this.getClass.getResource("/").getPath
//                            + "../../../..").getCanonicalPath
val path = "/localpath/sample/data1.csv"
val spark = SparkSession
      .builder()
      .master("local")
      .appName("S3UsingSDKExample")
      .config("spark.driver.host", "localhost")
      .config(accessKey, "<accesskey>")
      .config(secretKey, "<secretkey>")
      //.config(endpoint, "s3-us-east-1.amazonaws.com")
      .getOrCreateCarbonSession()
      spark.sql("Drop table if exists carbon_table")

    spark.sql(
      s"""
         | CREATE TABLE if not exists carbon_table(
         | shortField SHORT,
         | intField INT,
         | bigintField LONG,
         | doubleField DOUBLE,
         | stringField STRING,
         | timestampField TIMESTAMP,
         | decimalField DECIMAL(18,2),
         | dateField DATE,
         | charField CHAR(5),
         | floatField FLOAT
         | )
         | STORED BY 'carbondata'
         | LOCATION 's3n://bucketName/table/carbon_table'
         | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField')
       """.stripMargin)

}


def getKeyOnPrefix(path: String): (String, String, String) = {
    val endPoint = "spark.hadoop." + ENDPOINT
    if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
      ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint)
    } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) {
      ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY,
        "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint)
    } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) {
      ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY,
        "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint)
    } else {
      throw new Exception("Incorrect Store Path")
    }
  }
  def getSparkMaster(args: Array[String]): String = {
    if (args.length == 6) args(5)
    else if (args(3).contains("spark:") || args(3).contains("mesos:")) args(3)
    else "local"
  }
}

错误:

18/05/17 12:23:22 ERROR SegmentStatusManager: main Failed to read metadata of load
org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.ServiceException: Request Error: Empty key

我还尝试了(也尝试了 s3、s3n、s3a 协议)中的示例代码:

https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala

运行为:

S3Example.main(Array("accesskey","secretKey","s3://bucketName/path/carbon_table"," https://bucketName.s3.amazonaws.com ","local"))

错误堆栈跟踪:

org.apache.hadoop.fs.s3.S3Exception:org.jets3t.service.S3ServiceException:请求错误:org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:175) 处的空键.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221) 在 sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect .Method.invoke(Method.java:498) 在 org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) 在 org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java :102) com.sun.proxy.$Proxy21.retrieveINode(Unknown Source) at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340) 在 org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426) 在 org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.isFileExist(AbstractDFSCarbonFile.java:426) 在 org.apache.carbondata .core.datastore.impl.FileFactory.isFileExist(FileFactory.java:201) 在 org.apache.carbondata.core.statusmanager.SegmentStatusManager.readTableStatusFile(SegmentStatusManager.java:246) 在 org.apache.carbondata.core.statusmanager.SegmentStatusManager .readLoadMetadata(SegmentStatusManager.java:197) 在 org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(ManageDictionaryAndBTree.java:101) 在 org.apache.spark.sql.hive.CarbonFileMetastore.dropTable(CarbonFileMetastore.scala :460) 在 org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand。processMetadata(CarbonCreateTableCommand.scala:148) at org.apache.spark.sql.execution.command.MetadataCommand.run(package.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute( commands.scala:58) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala: 67) 在 org.apache.spark.sql.CarbonSession$$anonfun$sql$1.apply(CarbonSession.scala:107) 在 org.apache.spark.sql.Dataset.(Dataset.scala:183) 在 org.apache。 spark.sql.CarbonSession$$anonfun$sql$1.apply(CarbonSession.scala:96) at org.apache.spark.sql.CarbonSession.withProfiler(CarbonSession.scala:144) at org.apache.spark.sql.CarbonSession。 sql(CarbonSession.scala:94) 在 $line19。$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$S3Example$.main(:68) at $line26.$read$$iw$$iw$$iw $$iw$$iw$$iw$$iw$$iw.(:31) 在 $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(: 36) 在 $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw.(:38) 在 $line26.$read$$iw$$iw$$iw$$iw$ $iw.(:40) 在 $line26.$read$$iw$$iw$$iw$$iw.(:42) 在 $line26.$read$$iw$$iw$$iw.(:44)在 $line26.$read$$iw$$iw.(:46) 在 $line26.$read$$iw.(:48) 在 $line26.$read.(:50) 在 $line26.$read$。 (:54) 在 $line26.$read$.() 在 $line26.$eval$.$print$lzycompute(:7) 在 $line26.$eval$.$print(:6) 在 $line26.$eval .$print() 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786) 在 scala.tools.nsc.interpreter.IMain $Request.loadAndRun(IMain.scala:1047) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638) at scala.tools.nsc.interpreter.IMain$WrappedRequest $$anonfun$loadAndRunReq$1.apply(IMain.scala:637) at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader .scala:19) 在 scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569) 在 scala.tools 的 scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)。 nsc.interpreter.IMain.interpret(IMain.scala:565) 在 scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807) 在 scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681) 在 scala.tools.nsc.interpreter.ILoop .processLine(ILoop.scala:395) at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp (ILoop.scala:923) 在 scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) 在 scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply (ILoop.scala:909) at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97) at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909) at org.apache .spark.repl.Main$.doMain(Main.scala:74) 在 org.apache.spark.repl.Main$.main(Main.scala:54) 在 org.apache.spark.repl.Main。main(Main.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java .lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775) at org.apache .spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 在 org.apache.spark.deploy.SparkSubmit$.main (SparkSubmit.scala:119) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 原因:org.jets3t.service.S3ServiceException:请求错误:org.jets3t.service.S3Service 处的键为空。getObject(S3Service.java:1470) 在 org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:163)

是我传递错误的任何论点。我可以使用 aws cli 访问 s3 路径:

aws s3 ls s3://bucketName/path

存在于 S3 中。

4

2 回答 2

3

您可以使用此示例进行尝试https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala

在创建 carbonSession 之后,您必须先提供 aws 凭证属性才能触发。

如果您已经创建了 sparkContext 而没有提供 aws 属性。然后,即使您将其提供给 carbonContext,它也不会获取这些属性。

于 2018-05-17T07:42:24.423 回答
0

嗨 vikas 查看您的异常空密钥仅意味着您的访问密钥和密钥未在 carbon 会话中绑定,因为当我们提供 s3 实现时,我们编写的逻辑是,如果用户没有提供任何密钥,那么它们的值应该被视为空

所以为了让事情变得简单,首先使用这个命令构建碳数据罐

mvn -Pspark-2.1 clean package 然后用这个命令执行 spark submit

./spark-submit --jars file:///home/anubhav/Downloads/softwares/spark-2.2.1-bin-hadoop2.7/carbonlib/apache-carbondata-1.4.0-SNAPSHOT-bin-spark2.2.1 -hadoop2.7.2.jar --class org.apache.carbondata.examples.S3Example /home/anubhav/Documents/carbondata/carbondata/carbondata/examples/spark2/target/carbondata-examples-spark2-1.4.0-SNAPSHOT.jar当地的

用你的替换我的 jar 路径,看看它应该可以工作,它对我有用

于 2018-05-26T12:10:55.130 回答