3

我正在尝试从 IBM Data Science Experience 连接到 IBM Cloud Object Storage:

access_key = 'XXX'
secret_key = 'XXX'
bucket = 'mybucket'
host = 'lon.ibmselect.objstor.com' 
service = 'mycos'

sqlCxt = SQLContext(sc)
hconf = sc._jsc.hadoopConfiguration()
hconf.set('fs.cos.myCos.access.key', access_key)
hconf.set('fs.cos.myCos.endpoint', 'http://' + host)
hconf.set('fs.cose.myCos.secret.key', secret_key)
hconf.set('fs.cos.service.v2.signer.type', 'false')

obj = 'mydata.tsv.gz'

rdd = sc.textFile('cos://{0}.{1}/{2}'.format(bucket, service, obj))
print(rdd.count())

这将返回:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.io.IOException: No FileSystem for scheme: cos

我猜我需要使用基于 stocator docs的 'cos' 方案。但是,错误表明 stocator 不可用或者是旧版本?

有任何想法吗?


更新1:

我还尝试了以下方法:

sqlCxt = SQLContext(sc)
hconf = sc._jsc.hadoopConfiguration()
hconf.set('fs.cos.impl', 'com.ibm.stocator.fs.ObjectStoreFileSystem')
hconf.set('fs.stocator.scheme.list', 'cos')
hconf.set('fs.stocator.cos.impl', 'com.ibm.stocator.fs.cos.COSAPIClient')
hconf.set('fs.stocator.cos.scheme', 'cos')
hconf.set('fs.cos.mycos.access.key', access_key)
hconf.set('fs.cos.mycos.endpoint', 'http://' + host)
hconf.set('fs.cos.mycos.secret.key', secret_key)
hconf.set('fs.cos.service.v2.signer.type', 'false')

service = 'mycos'
obj = 'mydata.tsv.gz'          
rdd = sc.textFile('cos://{0}.{1}/{2}'.format(bucket, service, obj))
print(rdd.count())

然而,这一次的反应是:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.io.IOException: No object store for: cos
    at com.ibm.stocator.fs.ObjectStoreVisitor.getStoreClient(ObjectStoreVisitor.java:121)
    ...
Caused by: java.lang.ClassNotFoundException: com.ibm.stocator.fs.cos.COSAPIClient
4

5 回答 5

2

支持 fs.cos 方案的最新版 Stocator (v1.0.9) 尚未部署在 Spark aaService 上(即将推出)。请使用 stocator 方案“fs.s3d”连接到您的 COS。

例子:

endpoint = 'endpointXXX' 
access_key = 'XXX'
secret_key = 'XXX'

prefix = "fs.s3d.service"
hconf = sc._jsc.hadoopConfiguration()
hconf.set(prefix + ".endpoint", endpoint)
hconf.set(prefix + ".access.key", access_key)
hconf.set(prefix + ".secret.key", secret_key)

bucket = 'mybucket'
obj = 'mydata.tsv.gz'

rdd = sc.textFile('s3d://{0}.service/{1}'.format(bucket, obj))
rdd.count()

或者,您可以使用 ibmos2spark。该库已经安装在我们的服务上。例子:

import ibmos2spark

credentials = {
   'endpoint': 'endpointXXXX',
   'access_key': 'XXXX',
   'secret_key': 'XXXX'
}

configuration_name = 'os_configs' # any string you want
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name)

bucket = 'mybucket'
obj = 'mydata.tsv.gz'
rdd = sc.textFile(cos.url(obj, bucket))
rdd.count()
于 2017-09-04T11:18:13.150 回答
1

看起来 cos 驱动程序没有正确初始化。试试这个配置:

hconf.set('fs.cos.impl', 'com.ibm.stocator.fs.ObjectStoreFileSystem')

hconf.set('fs.stocator.scheme.list', 'cos')
hconf.set('fs.stocator.cos.impl', 'com.ibm.stocator.fs.cos.COSAPIClient')
hconf.set('fs.stocator.cos.scheme', 'cos')

hconf.set('fs.cos.mycos.access.key', access_key)
hconf.set('fs.cos.mycos.endpoint', 'http://' + host)
hconf.set('fs.cos.mycos.secret.key', secret_key)
hconf.set('fs.cos.service.v2.signer.type', 'false')

更新 1:

您还需要确保 stocator 类位于类路径上。您可以通过以下方式执行 pyspark 来使用包系统:

./bin/pyspark --packages com.ibm.stocator:stocator:1.0.24

这适用于swift2dcos方案。

更新 2:

只需遵循 Stocator 文档 ( https://github.com/CODAIT/stocator )。它包含所有详细信息如何安装它,使用什么分支等。

于 2017-09-02T08:23:23.827 回答
1

Stocator 位于 Spark 2.0 和 2.1 内核的类路径中,但未cos配置该方案。您可以通过在 Python 笔记本中执行以下命令来访问配置:

!cat $SPARK_CONF_DIR/core-site.xml

寻找物业fs.stocator.scheme.list。我目前看到的是:

<property>
    <name>fs.stocator.scheme.list</name>
    <value>swift2d,swift,s3d</value>
</property>

我建议您向 DSX 提出功能请求以支持该cos方案。

于 2017-09-04T05:50:16.530 回答
0

我发现了同样的问题,为了解决它,我只是改变了环境:

在 IBM Watson Studio 中,如果您在没有预配置 Spark 集群的环境中启动 Jupyter Notebook,则会收到该错误。安装PySpark是不够的。

相反,如果您启动一个具有可用 Spark 集群的笔记本,那么您就可以了。

于 2020-06-29T15:14:05.590 回答
0

您必须.config("spark.hadoop.fs.stocator.scheme.list", "cos")与其他一些fs.cos...配置一起设置。这是一个有效的端到端代码段示例(使用pyspark==2.3.2and测试Python 3.7.3):

from pyspark.sql import SparkSession

stocator_jar = '/path/to/stocator-1.1.2-SNAPSHOT-IBM-SDK.jar'
cos_instance_name = '<myCosIntanceName>'
bucket_name = '<bucketName>'
s3_region = '<region>'
cos_iam_api_key = '*******'
iam_servicce_id = 'crn:v1:bluemix:public:iam-identity::<****************>'

spark_builder = (
    SparkSession
        .builder
        .appName('test_app'))

spark_builder.config('spark.driver.extraClassPath', stocator_jar)
spark_builder.config('spark.executor.extraClassPath', stocator_jar)
spark_builder.config(f"fs.cos.{cos_instance_name}.iam.api.key", cos_iam_api_key)
spark_builder.config(f"fs.cos.{cos_instance_name}.endpoint", f"s3.{s3_region}.cloud-object-storage.appdomain.cloud")
spark_builder.config(f"fs.cos.{cos_instance_name}.iam.service.id", iam_servicce_id)
spark_builder.config("spark.hadoop.fs.stocator.scheme.list", "cos")
spark_builder.config("spark.hadoop.fs.cos.impl", "com.ibm.stocator.fs.ObjectStoreFileSystem")
spark_builder.config("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient")
spark_builder.config("fs.stocator.cos.scheme", "cos")

spark_sess = spark_builder.getOrCreate()

dataset = spark_sess.range(1, 10)
dataset = dataset.withColumnRenamed('id', 'user_idx')

dataset.repartition(1).write.csv(
    f'cos://{bucket_name}.{cos_instance_name}/test.csv',
    mode='overwrite',
    header=True)

spark_sess.stop()
print('done!')
于 2020-09-07T17:36:43.977 回答