我知道如何通过命令行使用 sqoop。但不知道如何使用 java 程序调用 sqoop 命令。任何人都可以提供一些代码视图吗?
5 回答
您可以通过在您的类路径中包含 sqoop jar 并调用该 Sqoop.runTool()
方法,从您的 java 代码中运行 sqoop。您必须以编程方式创建 sqoop 所需的参数,就好像它是命令行(例如--connect
等)一样。
请注意以下事项:
- 确保 sqoop 工具名称(例如导入/导出等)是第一个参数。
- 注意类路径排序 - 执行可能会失败,因为 sqoop 需要版本 X 的库并且您使用不同的版本。确保 sqoop 需要的库不会被您自己的依赖项所掩盖。我在使用 commons-io(sqoop 需要 v1.4)时遇到过这样的问题,并且由于我使用的是 commons-io v1.2,因此出现了 NoSuchMethod 异常。
- 每个参数都需要位于单独的数组元素上。例如,“--connect jdbc:mysql:...”应该作为数组中的两个独立元素传递,而不是一个。
- sqoop 解析器知道如何接受双引号参数,因此如果需要,请使用双引号(我总是建议)。唯一的例外是 fields-delimited-by 参数,它需要一个字符,所以不要双引号。
- 我建议将命令行参数创建逻辑和实际执行分开,这样您的逻辑就可以在不实际运行该工具的情况下进行正确测试。
- 最好使用 --hadoop-home 参数,以防止对环境的依赖。
Sqoop.runTool()
相对于的优点Sqoop.Main()
是runTool()
返回执行的错误代码。
希望有帮助。
final int ret = Sqoop.runTool(new String[] { ... });
if (ret != 0) {
throw new RuntimeException("Sqoop failed - return code " + Integer.toString(ret));
}
强化学习
在下面找到在 Java 程序中使用 sqoop 将数据从 MySQL 导入 HDFS/HBase 的示例代码。确保你的类路径中有 sqoop jar:
SqoopOptions options = new SqoopOptions();
options.setConnectString("jdbc:mysql://HOSTNAME:PORT/DATABASE_NAME");
//options.setTableName("TABLE_NAME");
//options.setWhereClause("id>10"); // this where clause works when importing whole table, ie when setTableName() is used
options.setUsername("USERNAME");
options.setPassword("PASSWORD");
//options.setDirectMode(true); // Make sure the direct mode is off when importing data to HBase
options.setNumMappers(8); // Default value is 4
options.setSqlQuery("SELECT * FROM user_logs WHERE $CONDITIONS limit 10");
options.setSplitByCol("log_id");
// HBase options
options.setHBaseTable("HBASE_TABLE_NAME");
options.setHBaseColFamily("colFamily");
options.setCreateHBaseTable(true); // Create HBase table, if it does not exist
options.setHBaseRowKeyColumn("log_id");
int ret = new ImportTool().run(options);
正如 Harel 所建议的,我们可以使用 run() 方法的输出进行错误处理。希望这会有所帮助。
有一个技巧对我来说效果很好。通过 ssh,可以直接执行 Sqoop 命令。只是你必须使用的是一个 SSH Java 库
这与 Java 无关。您只需要包含要执行导入的远程系统中安装的任何 SSH 库和 sqoop。现在通过 ssh 连接到系统并执行将数据从 MySQL 导出到 hive 的命令。
您必须遵循此步骤。
下载 sshxcute java 库:https ://code.google.com/p/sshxcute/ 并将其添加到包含以下 Java 代码的 java 项目的构建路径中
import net.neoremind.sshxcute.core.SSHExec;
import net.neoremind.sshxcute.core.ConnBean;
import net.neoremind.sshxcute.task.CustomTask;
import net.neoremind.sshxcute.task.impl.ExecCommand;
public class TestSSH {
public static void main(String args[]) throws Exception{
// Initialize a ConnBean object, the parameter list is IP, username, password
ConnBean cb = new ConnBean("192.168.56.102", "root","hadoop");
// Put the ConnBean instance as parameter for SSHExec static method getInstance(ConnBean) to retrieve a singleton SSHExec instance
SSHExec ssh = SSHExec.getInstance(cb);
// Connect to server
ssh.connect();
CustomTask sampleTask1 = new ExecCommand("echo $SSH_CLIENT"); // Print Your Client IP By which you connected to ssh server on Horton Sandbox
System.out.println(ssh.exec(sampleTask1));
CustomTask sampleTask2 = new ExecCommand("sqoop import --connect jdbc:mysql://192.168.56.101:3316/mysql_db_name --username=mysql_user --password=mysql_pwd --table mysql_table_name --hive-import -m 1 -- --schema default");
ssh.exec(sampleTask2);
ssh.disconnect();
}
}
请按照 vikas 给出的代码对我有用,并将这些 jar 文件包含在类路径中并导入这些包
导入 com.cloudera.sqoop.SqoopOptions;导入 com.cloudera.sqoop.tool.ImportTool;
参考图书馆
- Sqoop-1.4.4 jar /sqoop
- ojdbc6.jar /sqoop/lib(用于 oracle)
- commons-logging-1.1.1.jar hadoop/lib
- hadoop-core-1.2.1.jar /hadoop
- commons-cli-1.2.jar hadoop/lib
- commmons-io.2.1.jar hadoop/lib
- commons-configuration-1.6.jar hadoop/lib
- commons-lang-2.4.jar hadoop/lib
- jackson-core-asl-1.8.8.jar hadoop/lib
- jackson-mapper-asl-1.8.8.jar hadoop/lib
- commons-httpclient-3.0.1.jar hadoop/lib
JRE系统库
1.resources.jar jdk/jre/lib 2.rt.jar jdk/jre/lib 3. jsse.jar jdk/jre/lib 4. jce.jar jdk/jre/lib 5. charsets,jar jdk/jre/lib 6. jfr.jar jdk/jre/lib 7. dnsns.jar jdk/jre/lib/ext 8. sunec.jar jdk/jre/lib/ext 9. zipfs.jar jdk/jre/lib/ext 10. sunpkcs11. jar jdk/jre/lib/ext 11. localedata.jar jdk/jre/lib/ext 12. sunjce_provider.jar jdk/jre/lib/ext
有时,如果您的 Eclipse 项目使用 JDK1.6 并且您添加的库是 JDK1.7,则有时会出现错误,在这种情况下,在 Eclipse 中创建项目时配置 JRE。
Vikas,如果我想将导入的文件放入配置单元,我应该使用 options.parameter ("--hive-import") 吗?
如果您知道可执行文件的位置和可以使用的命令行参数ProcessBuilder
,则可以单独运行它Process
,Java 可以监控它的完成并返回代码。