我想记录我的 Spark 作业在p6spy的帮助下执行的 JDBC 语句。
使用 p6spy 通常很简单:将字符串:p6spy:
插入 jdbc url 并将 p6spy 驱动程序类包含到应用程序的类路径中。之后,所有 jdbc 操作都将记录到一个文件中。
例如,如果原始 (MySQL) 连接字符串是
jdbc:mysql://172.17.0.2:3306/spark_test启用日志记录的连接字符串将是
jdbc:p6spy: mysql://172.17.0.2:3306/spark_test
我使用这一行将数据帧写入 MySQL 表
df.write.mode(SaveMode.Overwrite).jdbc("jdbc:p6spy:mysql://172.17.0.2:3306/spark_test", "test_table", prop)
包含prop
数据库用户和密码。
这行代码失败并显示错误消息
Exception in thread "main" java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax;
check the manual that corresponds to your MySQL server version for the right syntax to use near '"value" INTEGER NOT NULL)' at line 1
如果没有:p6spy:
连接字符串中的部分,一切都会按预期工作。
到目前为止我的发现
错误的原因是 Spark 尝试执行语句
CREATE TABLE test_table ("value" INTEGER NOT NULL)
其中包括列名周围的“ 。对于 MySQL ,正确的字符是` 。
Spark 可以处理不同的 SQL 方言。方言在包org.apache.spark.sql.jdbc中实现。根据数据库的 jdbc url 选择要使用的方言。每个方言对象都实现该方法canHandle(url : String)
。MySQLDialect处理以 . 开头的 url,但jdbc:mysql
不处理以jdbc:p6spy:mysql
. 不幸的是,对于未知的 url 类型,Spark 默认使用NoopDialect 。这种方言在列名周围添加了" 。
可能的解决方案
可以通过调用JdbcDialects.registerDialect来注册新的数据库方言。在这里可以注册一种实现该canHandle
方法的新方言
override def canHandle(url: String): Boolean = url.startsWith("jdbc:p6spy:mysql")
然后将所有其他方法调用委托给原始 MySQL 方言。
不幸的是 MySQLDialect 对象被声明为
private case object MySQLDialect extends JdbcDialect {
...
}
所以我自己的方言实现不能直接使用 MySQLDialect。一种选择是将 MySQLDialect 的代码复制到我自己的方言对象中(代码不长),但我想避免复制代码。
还有其他选择吗?
我正在使用 Spark 2.4.5