0

我正在尝试使用 Spark 创建与 IBM COS(云对象存储)的连接。Spark 版本 = 2.4.4,Scala 版本 = 2.11.12。

我使用正确的凭据在本地运行它,但我观察到以下错误 - “方案没有文件系统:cos”

我正在共享代码片段以及错误日志。有人可以帮我解决这个问题。

提前致谢 !

代码片段:

import com.ibm.ibmos2spark.CloudObjectStorage
import org.apache.spark.sql.SparkSession

object CosConnection extends App{
  var credentials = scala.collection.mutable.HashMap[String, String](
      "endPoint"->"ENDPOINT",
      "accessKey"->"ACCESSKEY",
      "secretKey"->"SECRETKEY"
  )
  var bucketName = "FOO"
  var objectname = "xyz.csv"

  var configurationName = "softlayer_cos" 

  val spark = SparkSession
    .builder()
    .appName("Connect IBM COS")
    .master("local")
    .getOrCreate()


  spark.sparkContext.hadoopConfiguration.set("fs.stocator.scheme.list", "cos")
  spark.sparkContext.hadoopConfiguration.set("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient")
  spark.sparkContext.hadoopConfiguration.set("fs.stocator.cos.scheme", "cos")

  var cos = new CloudObjectStorage(spark.sparkContext, credentials, configurationName=configurationName)

  var dfData1 = spark.
    read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
    option("header", "true").
    option("inferSchema", "true").
    load(cos.url(bucketName, objectname))

  dfData1.printSchema()
  dfData1.show(5,0)
}

错误:

Exception in thread "main" java.io.IOException: No FileSystem for scheme: cos
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
4

3 回答 3

1

此问题已通过使用 SPARK 版本 = 2.4.4、SCALA 版本 = 2.11.12 映射以下 stocator 依赖关系得到解决

// https://mvnrepository.com/artifact/com.ibm.stocator/stocator
libraryDependencies += "com.ibm.stocator" % "stocator" % "1.0.24"

确保stocator-1.0.24-jar-with-dependencies.jar在构建包时有外部库

还要确保您将端点传递s3.us.cloud-object-storage.appdomain.cloudhttps://s3.us.cloud-object-storage.appdomain.cloud

您可以手动构建 stocator jar 并将 jar 包含target/stocator-1.0.24-SNAPSHOT-IBM-SDK.jar到 ClassPath 中(如果需要) -

git clone https://github.com/SparkTC/stocator
cd stocator
git fetch
git checkout -b 1.0.24-ibm-sdk origin/1.0.24-ibm-sdk
mvn clean install –DskipTests
于 2020-05-12T12:39:45.680 回答
0

您必须.config("spark.hadoop.fs.stocator.scheme.list", "cos")与其他一些fs.cos...配置一起设置。

这是一个有效的 Python 端到端代码段示例。转换成 Scala 应该很简单:

from pyspark.sql import SparkSession

stocator_jar = '/path/to/stocator-1.1.2-SNAPSHOT-IBM-SDK.jar'
cos_instance_name = '<myCosIntanceName>'
bucket_name = '<bucketName>'
s3_region = '<region>'
cos_iam_api_key = '*******'
iam_servicce_id = 'crn:v1:bluemix:public:iam-identity::<****************>'

spark_builder = (
    SparkSession
        .builder
        .appName('test_app'))

spark_builder.config('spark.driver.extraClassPath', stocator_jar)
spark_builder.config('spark.executor.extraClassPath', stocator_jar)
spark_builder.config(f"fs.cos.{cos_instance_name}.iam.api.key", cos_iam_api_key)
spark_builder.config(f"fs.cos.{cos_instance_name}.endpoint", f"s3.{s3_region}.cloud-object-storage.appdomain.cloud")
spark_builder.config(f"fs.cos.{cos_instance_name}.iam.service.id", iam_servicce_id)
spark_builder.config("spark.hadoop.fs.stocator.scheme.list", "cos")
spark_builder.config("spark.hadoop.fs.cos.impl", "com.ibm.stocator.fs.ObjectStoreFileSystem")
spark_builder.config("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient")
spark_builder.config("fs.stocator.cos.scheme", "cos")

spark_sess = spark_builder.getOrCreate()

dataset = spark_sess.range(1, 10)
dataset = dataset.withColumnRenamed('id', 'user_idx')

dataset.repartition(1).write.csv(
    f'cos://{bucket_name}.{cos_instance_name}/test.csv',
    mode='overwrite',
    header=True)

spark_sess.stop()
print('done!')
于 2020-09-07T17:40:04.743 回答
0

我在 Windows 10 上使用 spark 版本 2.4.5 和 Scala 版本 2.11.12。我已经在环境变量中为这两个版本添加了类路径。

  1. 启动 Spark shell 的命令(打开命令提示符并粘贴以下命令):

    spark-shell --packages com.ibm.stocator:stocator:1.0.36

如果您获得以下详细信息,则表示您已成功启动 spark-shell。 在此处输入图像描述

您还可以在浏览器上检查它,如命令提示符中给出的 -- Spark context Web UI available at http://localhost:4040 (在您的情况下端口可能会更改)。

  1. 在scala中设置配置信息(我的COS位置是us-east):

    sc.hadoopConfiguration.set("fs.stocator.scheme.list", "cos")
    sc.hadoopConfiguration.set("fs.cos.impl", "com.ibm.stocator.fs.ObjectStoreFileSystem")
    sc.hadoopConfiguration.set("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient")
    sc.hadoopConfiguration.set("fs.stocator.cos.scheme", "cos")
    sc.hadoopConfiguration.set("fs.cos.mycos.access.key", "your access key")
    sc.hadoopConfiguration.set("fs.cos.mycos.secret.key", "your secret key")
    sc.hadoopConfiguration.set("fs.cos.mycos.endpoint", "https://s3.us-east.cloud-object-storage.appdomain.cloud")
    
  2. 从清单文件中获取对象列表:

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val cosContent = sqlContext.read.text("cos://someBucketName.mycos/someFile.mf")
    cosContent.show(false)
    

在此处输入图像描述

或者,您可以从 parquet 文件中读取数据,如下所示:

 val event1 = sqlContext.read.parquet("cos://someBucketName.mycos/parquetDirectoryName/")
 event1.printSchema()
 event1.count()
 event1.show(false)

在此处输入图像描述

于 2020-11-04T11:23:40.597 回答