我正在尝试将级联查询的结果写入 MySQL 数据库。为此,我使用cascading-jdbc并遵循我在此处找到的示例。我正在使用cascading-jdbc-core
和cascading-jdbc-mysql
版本3.0.0
。
我正在从我的 REPL 中精确执行这段代码:
(let [data [["foo1" "bar1"]
["foo2" "bar2"]]
query-params (into-array String ["?col1" "?col2"])
column-names (into-array String ["col1" "col2"])
update-params (into-array String ["?col1"])
update-column-names (into-array String ["col1"])
jdbc-tap (fn []
(let [scheme (JDBCScheme.
(Fields. query-params)
column-names
nil
(Fields. update-params)
update-column-names)
table-desc (TableDesc.
"test_table"
query-params
column-names
(into-array String []))
tap (JDBCTap.
"jdbc:mysql://192.168.99.101:3306/test_db?user=root&password=my-secret-pw"
"com.mysql.jdbc.Driver"
table-desc
scheme)]
tap))]
(?<- (jdbc-tap)
[?col1 ?col2]
(data ?col1 ?col2)))
当我运行代码时,我在 REPL 中看到了这些日志:
15/12/11 11:08:44 INFO hadoop.FlowMapper: sinking to: JDBCTap{connectionUrl='jdbc:mysql://192.168.99.101:3306/test_db?user=root&password=my-secret-pw', driverClassName='com.mysql.jdbc.Driver', tableDesc=TableDesc{tableName='test_table', columnNames=[?col1, ?col2], columnDefs=[col1, col2], primaryKeys=[]}}
15/12/11 11:08:44 INFO mapred.Task: Task:attempt_local1324562503_0006_m_000000_0 is done. And is in the process of commiting
15/12/11 11:08:44 INFO mapred.LocalJobRunner:
15/12/11 11:08:44 INFO mapred.Task: Task 'attempt_local1324562503_0006_m_000000_0' done.
15/12/11 11:08:44 INFO mapred.LocalJobRunner: Finishing task: attempt_local1324562503_0006_m_000000_0
15/12/11 11:08:44 INFO mapred.LocalJobRunner: Map task executor complete.
一切看起来都很好。但是,没有写入数据。我对此进行了检查tcpdump
,甚至没有与我的本地 MySQL 数据库建立连接。此外,当我将 JDBC-connection-string 更改为明显错误的值(不存在的用户名、不存在的数据库名称,甚至不存在数据库服务器的 IP)时,我会得到相同的日志抱怨什么。
此外,更改jdbc-tap
tostdout
会产生预期值。
我根本不知道如何调试它。是否有可能产生错误输出?现在,我不知道出了什么问题。