本笔记本说明了使用 Scala 语言加载数据的多种方法。Scala 在 JVM 上运行。Java 和 Scala 类可以自由混合,无论它们位于不同的项目中还是在同一个项目中。查看 Scala 代码如何与 Openstack Swift 对象存储交互的机制应该有助于指导您制作 Java 等价物。
从上面的笔记本中,这里有一些步骤说明了如何使用 Scala 语言的Stocator 库从 Openstack Swift 对象存储实例中配置和提取数据。swift url 分解为:
swift2d :// container . myacct / filename.extension
^ ^ ^ ^
stocator name of namespace object storage
protocol container filename
进口
import org.apache.spark.SparkContext
import scala.util.control.NonFatal
import play.api.libs.json.Json
val sqlctx = new SQLContext(sc)
val scplain = sqlctx.sparkContext
样本信用
// @hidden_cell
var credentials = scala.collection.mutable.HashMap[String, String](
"auth_url"->"https://identity.open.softlayer.com",
"project"->"object_storage_3xxxxxx3_xxxx_xxxx_xxxx_xxxxxxxxxxxx",
"project_id"->"6xxxxxxxxxx04fxxxxxxxxxx6xxxxxx7",
"region"->"dallas",
"user_id"->"cxxxxxxxxxxaxxxxxxxxxx1xxxxxxxxx",
"domain_id"->"cxxxxxxxxxxaxxyyyyyyxx1xxxxxxxxx",
"domain_name"->"853255",
"username"->"Admin_cxxxxxxxxxxaxxxxxxxxxx1xxxxxxxxx",
"password"->"""&M7372!FAKE""",
"container"->"notebooks",
"tenantId"->"undefined",
"filename"->"file.xml"
)
辅助方法
def setRemoteObjectStorageConfig(name:String, sc: SparkContext, dsConfiguration:String) : Boolean = {
try {
val result = scala.util.parsing.json.JSON.parseFull(dsConfiguration)
result match {
case Some(e:Map[String,String]) => {
val prefix = "fs.swift2d.service." + name
val hconf = sc.hadoopConfiguration
hconf.set("fs.swift2d.impl","com.ibm.stocator.fs.ObjectStoreFileSystem")
hconf.set(prefix + ".auth.url", e("auth_url") + "/v3/auth/tokens")
hconf.set(prefix + ".tenant", e("project_id"))
hconf.set(prefix + ".username", e("user_id"))
hconf.set(prefix + ".password", e("password"))
hconf.set(prefix + "auth.method", "keystoneV3")
hconf.set(prefix + ".region", e("region"))
hconf.setBoolean(prefix + ".public", true)
println("Successfully modified sparkcontext object with remote Object Storage Credentials using datasource name " + name)
println("")
return true
}
case None => println("Failed.")
return false
}
}
catch {
case NonFatal(exc) => println(exc)
return false
}
}
加载数据
val setObjStor = setRemoteObjectStorageConfig("sparksql", scplain, Json.toJson(credentials.toMap).toString)
val data_rdd = scplain.textFile("swift2d://notebooks.sparksql/" + credentials("filename"))
data_rdd.take(5)