2

我想记录我的 Spark 作业在p6spy的帮助下执行的 JDBC 语句。

使用 p6spy 通常很简单:将字符串:p6spy:插入 jdbc url 并将 p6spy 驱动程序类包含到应用程序的类路径中。之后,所有 jdbc 操作都将记录到一个文件中。

例如,如果原始 (MySQL) 连接字符串是

jdbc:mysql://172.17.0.2:3306/spark_test
启用日志记录的连接字符串将是
jdbc:p6spy: mysql://172.17.0.2:3306/spark_test

我使用这一行将数据帧写入 MySQL 表

df.write.mode(SaveMode.Overwrite).jdbc("jdbc:p6spy:mysql://172.17.0.2:3306/spark_test", "test_table", prop)

包含prop数据库用户和密码。

这行代码失败并显示错误消息

Exception in thread "main" java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax;
check the manual that corresponds to your MySQL server version for the right syntax to use near '"value" INTEGER NOT NULL)' at line 1

如果没有:p6spy:连接字符串中的部分,一切都会按预期工作。


到目前为止我的发现

错误的原因是 Spark 尝试执行语句

CREATE TABLE test_table ("value" INTEGER NOT NULL)

其中包括列名周围的 。对于 MySQL ,正确的字符是` 。

Spark 可以处理不同的 SQL 方言。方言在包org.apache.spark.sql.jdbc中实现。根据数据库的 jdbc url 选择要使用的方言。每个方言对象都实现该方法canHandle(url : String)MySQLDialect处理以 . 开头的 url,但jdbc:mysql不处理以jdbc:p6spy:mysql. 不幸的是,对于未知的 url 类型,Spark 默认使用NoopDialect 。这种方言在列名周围添加了" 。


可能的解决方案

可以通过调用JdbcDialects.registerDialect来注册新的数据库方言。在这里可以注册一种实现该canHandle方法的新方言

override def canHandle(url: String): Boolean = url.startsWith("jdbc:p6spy:mysql")

然后将所有其他方法调用委托给原始 MySQL 方言。

不幸的是 MySQLDialect 对象被声明为

private case object MySQLDialect extends JdbcDialect {
  ...
}

所以我自己的方言实现不能直接使用 MySQLDialect。一种选择是将 MySQLDialect 的代码复制到我自己的方言对象中(代码不长),但我想避免复制代码。

还有其他选择吗?

我正在使用 Spark 2.4.5

4

0 回答 0