2

我目前正在开发一个使用 JDK1.7 编译的项目,使用 Cascading 1.2(即将升级到 2.1)创建和运行 Hadoop 作业,并使用 Hadoop 的 Cloudera 发行版(0.20.2-cdh3u3)。

我正在研究如何修改我的 Cascading/Hadoop 作业以从 MySQL db 读取和写入所有数据。看起来 SQOOP 可能能够处理这个问题。

但是,到目前为止,我所看到的关于如何在 Java 中执行此操作的信息或文档很少(我知道 SQOOP 主要应该用于从 shell 中调用的批处理作业)——我遵循的 Java 示例没有t 为我工作。我曾尝试使用 SQOOP 1.4 并将我的项目切换为使用 JDK1.6,因为我认为这是必需的,(尽管它会破坏我项目的其他部分)但我仍然无法让它工作。

有谁知道我想要实现的目标是否可能?其他人如何处理这个问题?SQOOP2 的发布会有帮助吗?

当我尝试运行 org.apache.sqoop.tool.ExportTool 以将 CSV 导出到表时,我看到的错误类型是:

由于(很可能)类加载器问题而无法初始化 javac 处理器:java.lang.NoClassDefFoundError: com/sun/tools/javac/processing/JavacProcessingEnvironment

注意:\tmp\sqoop-my.name\compile\9031edc8e43167c10f9f895b64aa79d5\MyTableName.java 使用或覆盖已弃用的 API。

运行导出作业时遇到 IOException:java.io.IOException: 无法将 jar \tmp\sqoop-my.name\compile\9031edc8e43167c10f9f895b64aa79d5\MyTableName.jar 加载到 JVM 中。(找不到类 MyTableName。)

4

3 回答 3

2

Sqoop 设计用于在 MySQL/其他关系数据库和 Hadoop/HBase 之间导出/导入数据。可以在这里找到一个非常好的关于 sqoop 的教程,它解释了它的各种功能。不确定这是否是您想要做的。

如果您需要在 MapReduce 作业中从 MySQL 读取/写入数据,DBInputFormat/DBOutput可以按照@Charles 的建议使用 hadoop 类

于 2013-06-06T15:41:00.757 回答
1

如果您只想将作业输出写入 MySQL,我建议您使用另一种输出格式DBOutputFormat如下所述:

伴随类 DBOutputFormat 将允许您将结果写回数据库。设置作业时,调用 conf.setOutputFormat(DBOutputFormat.class); 然后像以前一样调用 DBConfiguration.configureDB() 。

然后 DBOutputFormat.setOutput() 方法定义结果将如何写回数据库。它的三个参数是作业的 JobConf 对象、定义要写入的表的名称的字符串和定义要填充的表的字段的字符串数组。例如,DBOutputFormat.setOutput(job, "employees", "employee_id", "name");。

您之前创建的相同 DBWritable 实现足以将记录注入数据库。write(PreparedStatement stmt) 方法将在您从 reducer 传递给 OutputCollector 的 DBWritable 的每个实例上调用。在 reduce 结束时,这些 PreparedStatement 对象将转换为 INSERT 语句以针对 SQL 数据库运行。

其中“与以前一样”指的是该指令:

DBConfiguration.configureDB(conf, “com.mysql.jdbc.Driver”, “jdbc:mysql://localhost/mydatabase”);

要从 MySQL 中读取,与DBInputFormat.

于 2013-06-06T15:23:28.280 回答
1

谢谢查尔斯和维卡斯。这无疑使我走上了正确的轨道。我最终使用了https://github.com/cwensel/cascading.jdbc,它使用 Hadoop 类DBInputFormat/DBOutput来轻松设置读取和写入 db 的级联作业。

要写,我只是将我的水龙头的输出流更改为:

String url = "jdbc:mysql://localhost:3306/mydb?user=myusername&password=mypassword";
String driver = "com.mysql.jdbc.Driver";
String tableName = "mytable";   
String[] columnNames = {'col1', 'col2', 'col3'}; //Columns I want to write to 
TableDesc tableDesc = new TableDesc( tableName );

JDBCScheme dbScheme = new JDBCScheme( columnNames );
Tap dbOutputTap = new JDBCTap( url, driver, tableDesc, dbScheme );

为了从 db 中读取数据,我只需轻按一下,如下所示:

String url = "jdbc:mysql://localhost:3306/mydb?user=myusername&password=mypassword";
String driver = "com.mysql.jdbc.Driver";
String tableName = "mytable";      
String[] columnNames = {'col1', 'col2', 'col3'}; //Columns I want to read from 
TableDesc tableDesc = new TableDesc( tableName );

JDBCScheme dbScheme = new JDBCScheme( columnNames, "col1<40" );
Tap dbInputTap = new JDBCTap( url, driver, tableDesc, dbScheme );

我也遇到了 Cascading-DBMigrate,但似乎这仅用于从数据库读取而不是写入它们。

于 2013-06-17T12:22:54.770 回答