11

使用 Spark 1.4.0,我正在尝试使用 insertIntoJdbc() 将来自 Spark DataFrame 的数据插入到 MemSQL 数据库中(这应该与与 MySQL 数据库的交互完全相同)。但是,我不断收到 Runtime TableAlreadyExists 异常。

首先,我像这样创建 MemSQL 表:

CREATE TABLE IF NOT EXISTS table1 (id INT AUTO_INCREMENT PRIMARY KEY, val INT);

然后我在 Spark 中创建一个简单的数据框并尝试像这样插入到 MemSQL 中:

val df = sc.parallelize(Array(123,234)).toDF.toDF("val")
//df: org.apache.spark.sql.DataFrame = [val: int]

df.insertIntoJDBC("jdbc:mysql://172.17.01:3306/test?user=root", "table1", false)

java.lang.RuntimeException: Table table1 already exists.
4

3 回答 3

7

此解决方案适用于一般的 JDBC 连接,尽管@wayne 的答案可能是专门针对 memSQL 的更好解决方案。

insertIntoJdbc 从 1.4.0 开始似乎已被弃用,使用它实际上调用 write.jdbc()。

write() 返回一个 DataFrameWriter 对象。如果要将数据附加到表中,则必须将对象的保存模式更改为"append".

上述问题中示例的另一个问题是 DataFrame 架构与目标表的架构不匹配。

下面的代码给出了 Spark shell 的一个工作示例。我spark-shell --driver-class-path mysql-connector-java-5.1.36-bin.jar用来启动我的 spark-shell 会话。

import java.util.Properties

val prop = new Properties() 
prop.put("user", "root")
prop.put("password", "")  

val df = sc.parallelize(Array((1,234), (2,1233))).toDF.toDF("id", "val")   
val dfWriter = df.write.mode("append") 

dfWriter.jdbc("jdbc:mysql://172.17.01:3306/test", "table1", prop) 
于 2015-10-02T22:02:14.267 回答
3

insertIntoJDBC 文档实际上是不正确的;他们说该表必须已经存在,但实际上如果存在,它会抛出一个错误,如上所示:

https://github.com/apache/spark/blob/03cca5dce2cd7618b5c0e33163efb8502415b06e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L264

我们建议使用我们的 MemSQL Spark 连接器,您可以在此处找到:

https://github.com/memsql/memsql-spark-connector

如果在代码中包含该库并导入 com.memsql.spark.connector._,则可以使用 df.saveToMemSQL(...) 将 DataFrame 保存到 MemSQL。您可以在此处找到我们的连接器的文档:

http://memsql.github.io/memsql-spark-connector/latest/api/#com.memsql.spark.connector.DataFrameFunctions

于 2015-10-02T21:16:09.080 回答
1

我有同样的问题。将 spark 版本更新到 1.6.2 工作正常

于 2016-09-21T10:14:08.830 回答