3

我编写了一个 python 代码来通过 Apache-Spark 从 Amazon Web Service (AWS) S3 加载文件。data具体来说,代码创建 RDD 并ruofan-bucket使用SparkContext().wholeTextFiles("s3n://ruofan-bucket/data"). 代码如下所示:

import os, sys, inspect

### Current directory path.
curr_dir = os.path.split(inspect.getfile(inspect.currentframe()))[0]

### Setup the environment variables
spark_home_dir = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "../spark-1.4.0")))
python_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python")))
os.environ["SPARK_HOME"] = spark_home_dir
os.environ["PYTHONPATH"] = python_dir

### Setup pyspark directory path
pyspark_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python")))
sys.path.append(pyspark_dir)

### Import the pyspark
from pyspark import SparkConf, SparkContext

def main():
    ### Initialize the SparkConf and SparkContext
    conf = SparkConf().setAppName("ruofan").setMaster("local")
    sc = SparkContext(conf = conf)

    ### Create a RDD containing metadata about files in directory "data"
    datafile = sc.wholeTextFiles("s3n://ruofan-bucket/data")    ### Read data directory from S3 storage.

    ### Collect files from the RDD
    datafile.collect()


if __name__ == "__main__":
    main()

在我运行我的代码之前,我已经导出了环境变量:AWS_SECRET_ACCESS_KEYAWS_ACCESS_KEY_ID. 但是当我运行我的代码时,它会显示错误:

IOError: [Errno 2] No such file or directory: 's3n://ruofan-bucket/data/test1.csv'

我确定我在 AWS S3 上有目录和文件,但我不知道这个错误。如果有人帮助我解决问题,我真的很感激。

4

2 回答 2

1

这似乎wholeTextFiles不适用于 Amazon S3。

看:

但是,Hadoop版本之间可能存在差异,所以不要认为它是确定的。

于 2015-08-02T06:02:32.507 回答
0

您可以尝试以下方法将数据从 S3 加载到 RDD 中,然后将结果循环并打印出来。使用 Spark SQL 后,您可以进行任何转换。

      val spark = SparkSession
    .builder()
    .appName("Spark SQL POC")
    .master("local")
    .getOrCreate()

  import spark.implicits._

  val sparkContext = spark.sparkContext

  sparkContext.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
  sparkContext.hadoopConfiguration.set("fs.s3.awsAccessKeyId", accessKey)
  sparkContext.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", secret)
  sparkContext.hadoopConfiguration.set("fs.s3.endpoint",region)

  // The schema is encoded in a string
  val schemaString = "country displayName alias status userName userID exportTime city title email registrationDate firstName lastName dateOfBirth address1 address2 address3 postCode telephoneNumber nickName exclusionExpiryDate exclusionPeriod blocked blockReason lastLoginDate gender mobileNumber marketingSms marketingEmail affiliateMarker depositLimitDate depositLimitPeriod depositLimitAmount depositLimitCurrency depositLimitWaitForUpdate depositLimitUpdatePeriod depositLimitUpdateAmount depositLimitUpdateCurrency userModified selfExclusionModified userBlockModified registeredBy userNote"

  // Generate the schema based on the string of schema
  val fields = schemaString.split(" ")
    .map(fieldName => StructField(fieldName, StringType, nullable = true))

  val schema = StructType(fields)

  var s3Users = spark.sqlContext.read.schema(schema).json("s3://asgaard-data/users/*/*/*/*/").rdd

  // Apply the schema to the RDD
  val usersDataFrame = spark.createDataFrame(s3Users, schema)

  // Creates a temporary view using the DataFrame
  usersDataFrame.createOrReplaceTempView("users")

  // SQL can be run over a temporary view created using DataFrames
  val results = spark.sql("SELECT userName FROM users limit 10")

  results.map(attributes => "UserName: " + attributes(0)).show()

版本如下

        <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.10.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>2.1.0</version>
    </dependency> 

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>
于 2017-04-04T15:10:38.877 回答