10

是否可以将 Microsoft Sql Server(以及 oracle、mysql 等)中的数据读取到 Spark 应用程序中的 rdd 中?或者我们是否需要创建一个内存集并将其并行化为一个 RDD?

4

2 回答 2

6

在 Spark 1.4.0+ 中,您现在可以使用sqlContext.read.jdbc

这将为您提供 DataFrame 而不是 Row 对象的 RDD。

相当于您在上面发布的解决方案将是

sqlContext.read.jdbc("jdbc:sqlserver://omnimirror;databaseName=moneycorp;integratedSecurity=true;", "TABLE_NAME", "id", 1, 100000, 1000, new java.util.Properties)

它应该获取表的模式,但如果你想强制它,你可以在阅读后使用模式方法sqlContext.read.schema(...insert schema here...).jdbc(...rest of the things...)

请注意,您不会在这里获得 SomeClass 的 RDD(在我看来这更好)。相反,您将获得相关字段的 DataFrame。

更多信息可以在这里找到:http ://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

于 2015-08-28T13:23:55.263 回答
5

从邮件列表中找到了解决方案。JdbcRDD 可用于完成此操作。我需要获取 MS Sql Server JDBC 驱动程序 jar 并将其添加到我的项目的库中。我想使用集成安全性,因此需要将 sqljdbc_auth.dll(可在同一下载中获得)放在 java.library.path 可以看到的位置。然后,代码如下所示:

     val rdd = new JdbcRDD[Email](sc,
          () => {DriverManager.getConnection(
 "jdbc:sqlserver://omnimirror;databaseName=moneycorp;integratedSecurity=true;")},
          "SELECT * FROM TABLE_NAME Where ? < X and X < ?",
            1, 100000, 1000,
          (r:ResultSet) => { SomeClass(r.getString("Col1"), 
            r.getString("Col2"), r.getString("Col3")) } )

这给出了 SomeClass 的 Rdd。第二、第三和第四个参数是必需的,用于下限和上限,以及分区数。换句话说,该源数据需要通过 long 进行分区才能正常工作。

于 2014-10-23T11:30:59.970 回答