我有 Cassandra 数据库,我通过 Apache Spark 使用 SparkSQL 分析了数据。现在我想将这些分析的数据插入到 PostgreSQL 中。除了使用 PostgreSQL 驱动程序之外,还有什么方法可以直接实现这一点(我使用 postREST 和驱动程序实现了它,我想知道是否有类似的方法saveToCassandra()
)?
4 回答
目前还没有将 RDD 写入任何 DBMS 的本地实现。以下是 Spark 用户列表中相关讨论的链接:一、二
一般来说,最高效的方法如下:
- 验证RDD中的分区数,不能太低也不能太高。20-50 个分区应该没问题,如果数字较低 - 调用
repartition
20 个分区,如果更高 - 调用coalesce
50 个分区 - 调用
mapPartition
转换,在其中调用函数以使用 JDBC 将记录插入 DBMS。在此函数中,您打开与数据库的连接并使用此 API的 COPY 命令,这将允许您消除对每条记录的单独命令的需要 - 这样插入的处理速度会更快
这样,您可以使用多达 50 个并行连接以并行方式将数据插入 Postgres(取决于您的 Spark 集群大小及其配置)。整个方法可以实现为接受 RDD 和连接字符串的 Java/Scala 函数
您可以使用 Postgres 复制 api 来编写它,这样会快得多。请参阅以下两种方法 - 一种遍历 RDD 以填充可以通过复制 api 保存的缓冲区。您唯一需要注意的是创建将由复制 api 使用的 csv 格式的正确语句。
def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = {
val sb = mutable.StringBuilder.newBuilder
val now = System.currentTimeMillis()
rdd.collect().foreach(itr => {
itr.foreach(_.createCSV(sb, now).append("\n"))
})
copyIn("myTable", new StringReader(sb.toString), "statement")
sb.clear
}
def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = {
val conn = connectionPool.getConnection()
try {
conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader)
} catch {
case se: SQLException => logWarning(se.getMessage)
case t: Throwable => logWarning(t.getMessage)
} finally {
conn.close()
}
}
0x0FFF 的回答很好。这是一个有用的附加点。
我曾经foreachPartition
坚持到外部商店。这也符合Design Patterns for using foreachRDD
Spark 文档
https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#output-operations-on-dstreams中给出的设计模式
例子:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
上面的答案是指旧的 spark 版本,在 spark 2.* 中有 jdbc 连接器,允许从数据帧直接写入 RDBS。
例子:
jdbcDF2.write.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html