您需要执行以下操作(例如假设典型的员工表):
JobConf conf = new JobConf(getConf(), MyDriver.class);
conf.setInputFormat(DBInputFormat.class);
DBConfiguration.configureDB(conf, “com.mysql.jdbc.Driver”, “jdbc:mysql://localhost/mydatabase”); String [] fields = { “employee_id”, "name" };
DBInputFormat.setInput(conf, MyRecord.class, “employees”, null /* conditions */, “employee_id”, fields);
...
// other necessary configuration
JobClient.runJob(conf);
和configureDB()
调用setInput()
配置DBInputFormat
. 第一个调用指定要使用的 JDBC 驱动程序实现以及要连接到的数据库。第二个调用指定要从数据库加载哪些数据。MyRecord 类是在 Java 中读取数据的类,“employees”是要读取的表的名称。“employee_id”参数指定表的主键,用于排序结果。下面的“InputFormat 的限制”部分解释了为什么这是必要的。最后,fields 数组列出了要读取的表的哪些列。的重载定义setInput()
允许您指定要从中读取的任意 SQL 查询。
调用configureDB()
and之后setInput()
,您应该像往常一样配置其余的作业,设置 Mapper 和 Reducer 类,指定要读取的任何其他数据源(例如,HDFS 中的数据集)和其他特定于作业的参数。
您需要创建自己的实现Writable
- 如下所示(将 id 和 name 视为表字段):
class MyRecord implements Writable, DBWritable {
long id;
String name;
public void readFields(DataInput in) throws IOException {
this.id = in.readLong();
this.name = Text.readString(in);
}
public void readFields(ResultSet resultSet) throws SQLException {
this.id = resultSet.getLong(1);
this.name = resultSet.getString(2); }
public void write(DataOutput out) throws IOException {
out.writeLong(this.id);
Text.writeString(out, this.name); }
public void write(PreparedStatement stmt) throws SQLException {
stmt.setLong(1, this.id);
stmt.setString(2, this.name); }
}
然后,映射器接收您的 DBWritable 实现的实例作为其输入值。输入key是数据库提供的一个row id;您很可能会丢弃此值。
public class MyMapper extends MapReduceBase implements Mapper<LongWritable, MyRecord, LongWritable, Text> {
public void map(LongWritable key, MyRecord val, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException {
// Use val.id, val.name here
output.collect(new LongWritable(val.id), new Text(val.name));
}
}
更多信息:阅读以下链接(我的答案的实际来源): http: //blog.cloudera.com/blog/2009/03/database-access-with-hadoop/