9

我有 Cassandra 数据库,我通过 Apache Spark 使用 SparkSQL 分析了数据。现在我想将这些分析的数据插入到 PostgreSQL 中。除了使用 PostgreSQL 驱动程序之外,还有什么方法可以直接实现这一点(我使用 postREST 和驱动程序实现了它,我想知道是否有类似的方法saveToCassandra())?

4

4 回答 4

13

目前还没有将 RDD 写入任何 DBMS 的本地实现。以下是 Spark 用户列表中相关讨论的链接:

一般来说,最高效的方法如下:

  1. 验证RDD中的分区数,不能太低也不能太高。20-50 个分区应该没问题,如果数字较低 - 调用repartition20 个分区,如果更高 - 调用coalesce50 个分区
  2. 调用mapPartition转换,在其中调用函数以使用 JDBC 将记录插入 DBMS。在此函数中,您打开与数据库的连接并使用此 API的 COPY 命令,这将允许您消除对每条记录的单独命令的需要 - 这样插入的处理速度会更快

这样,您可以使用多达 50 个并行连接以并行方式将数据插入 Postgres(取决于您的 Spark 集群大小及其配置)。整个方法可以实现为接受 RDD 和连接字符串的 Java/Scala 函数

于 2015-02-03T15:33:03.820 回答
2

您可以使用 Postgres 复制 api 来编写它,这样会快得多。请参阅以下两种方法 - 一种遍历 RDD 以填充可以通过复制 api 保存的缓冲区。您唯一需要注意的是创建将由复制 api 使用的 csv 格式的正确语句。

def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = {
        val sb = mutable.StringBuilder.newBuilder
        val now = System.currentTimeMillis()

        rdd.collect().foreach(itr => {
            itr.foreach(_.createCSV(sb, now).append("\n"))
        })

        copyIn("myTable",  new StringReader(sb.toString), "statement")
        sb.clear
    }


def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = {
        val conn = connectionPool.getConnection()
        try {
            conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader)
        } catch {
            case se: SQLException => logWarning(se.getMessage)
            case t: Throwable => logWarning(t.getMessage)
        } finally {
            conn.close()
        }
    }
于 2015-05-26T23:00:57.737 回答
1

0x0FFF 的回答很好。这是一个有用的附加点。

我曾经foreachPartition坚持到外部商店。这也符合Design Patterns for using foreachRDDSpark 文档 https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#output-operations-on-dstreams中给出的设计模式

例子:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}
于 2015-03-22T06:08:01.213 回答
1

上面的答案是指旧的 spark 版本,在 spark 2.* 中有 jdbc 连接器,允许从数据帧直接写入 RDBS。

例子:

jdbcDF2.write.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

于 2019-05-25T22:16:24.087 回答