3

我在数据库中有一些数据,我想在 Spark 中使用sparklyr使用它。

我可以使用基于DBI的包将数据库中的数据导入 R

dbconn <- dbConnect(<some connection args>)
data_in_r <- dbReadTable(dbconn, "a table") 

然后使用将数据从 R 复制到 Spark

sconn <- spark_connect(<some connection args>)
data_ptr <- copy_to(sconn, data_in_r)

对于大数据集,复制两次很慢。

如何将数据直接从数据库复制到 Spark?

sparklyr有几个spark_read_*()导入功能,但与数据库无关。 sdf_import()看起来有可能,但目前尚不清楚如何在这种情况下使用它。

4

1 回答 1

6

Sparklyr >= 0.6.0

您可以使用spark_read_jdbc.

Sparklyr < 0.6.0

我希望有一个更优雅的解决方案,但这里有一个使用低级 API 的最小示例:

  • 确保 Spark 可以访问所需的 JDBC 驱动程序,例如通过将其坐标添加到spark.jars.packages. 例如,使用 PostgreSQL(针对当前版本进行调整),您可以添加:

    spark.jars.packages org.postgresql:postgresql:9.4.1212
    

    SPARK_HOME/conf/spark-defaults.conf

  • 加载数据并注册为临时视图:

    name <- "foo"
    
    spark_session(sc) %>% 
      invoke("read") %>% 
      # JDBC URL and table name
      invoke("option", "url", "jdbc:postgresql://host/database") %>% 
      invoke("option", "dbtable", "table") %>% 
      # Add optional credentials
      invoke("option", "user", "scott") %>%
      invoke("option", "password", "tiger") %>% 
      # Driver class, here for PostgreSQL
      invoke("option", "driver", "org.postgresql.Driver") %>% 
      # Read and register as a temporary view
      invoke("format", "jdbc") %>% 
      invoke("load") %>% 
      # Spark 2.x, registerTempTable in 1.x
      invoke("createOrReplaceTempView", name)
    

    options您可以使用 : 一次传递多个environment

    invoke("options", as.environment(list(
      user="scott", password="tiger", url="jdbc:..."
    )))
    
  • 加载临时视图dplyr

    dplyr::tbl(sc, name)
    
  • 请务必阅读更多 JDBC 选项,重点partitionColumn*BoundnumPartitions

  • 有关其他详细信息,请参见如何使用 JDBC 源在 (Py)Spark 中写入和读取数据?以及如何使用 DataFrame 和 JDBC 连接来提高慢速 Spark 作业的性能?

于 2017-01-31T21:39:35.637 回答