0

我正在尝试在 Hadoop 进程中将 MySQL 设置为输入。如何为 Hadoop 使用 DBInputFormat 类 - 版本 1.0.3 中的 MySQL 连接?通过 hadoop-1.0.3/docs/api/ 中的 JobConf 配置作业不起作用。

// Create a new JobConf
JobConf job = new JobConf(new Configuration(), MyJob.class);

// Specify various job-specific parameters     
job.setJobName("myjob");

FileInputFormat.setInputPaths(job, new Path("in"));
FileOutputFormat.setOutputPath(job, new Path("out"));

job.setMapperClass(MyJob.MyMapper.class);
job.setCombinerClass(MyJob.MyReducer.class);
job.setReducerClass(MyJob.MyReducer.class);

job.setInputFormat(SequenceFileInputFormat.class);
job.setOutputFormat(SequenceFileOutputFormat.class);
4

2 回答 2

0

看看这个帖子。它展示了如何将数据从 Map Reduce 下沉到 MySQL 数据库。

于 2012-12-17T06:03:33.643 回答
0

您需要执行以下操作(例如假设典型的员工表):

JobConf conf = new JobConf(getConf(), MyDriver.class);
    conf.setInputFormat(DBInputFormat.class); 
    DBConfiguration.configureDB(conf, “com.mysql.jdbc.Driver”, “jdbc:mysql://localhost/mydatabase”); String [] fields = { “employee_id”, "name" };
    DBInputFormat.setInput(conf, MyRecord.class, “employees”, null /* conditions */, “employee_id”, fields); 
    ...
    // other necessary configuration
    JobClient.runJob(conf); 

configureDB()调用setInput()配置DBInputFormat. 第一个调用指定要使用的 JDBC 驱动程序实现以及要连接到的数据库。第二个调用指定要从数据库加载哪些数据。MyRecord 类是在 Java 中读取数据的类,“employees”是要读取的表的名称。“employee_id”参数指定表的主键,用于排序结果。下面的“InputFormat 的限制”部分解释了为什么这是必要的。最后,fields 数组列出了要读取的表的哪些列。的重载定义setInput()允许您指定要从中读取的任意 SQL 查询。

调用configureDB()and之后setInput(),您应该像往常一样配置其余的作业,设置 Mapper 和 Reducer 类,指定要读取的任何其他数据源(例如,HDFS 中的数据集)和其他特定于作业的参数。

您需要创建自己的实现Writable- 如下所示(将 id 和 name 视为表字段):

class MyRecord implements Writable, DBWritable { 
    long id; 
    String name; 

    public void readFields(DataInput in) throws IOException { 
        this.id = in.readLong(); 
        this.name = Text.readString(in); 
        } 

    public void readFields(ResultSet resultSet) throws SQLException { 
        this.id = resultSet.getLong(1); 
        this.name = resultSet.getString(2); } 

    public void write(DataOutput out) throws IOException { 
        out.writeLong(this.id); 
        Text.writeString(out, this.name); } 

    public void write(PreparedStatement stmt) throws SQLException { 
        stmt.setLong(1, this.id); 
        stmt.setString(2, this.name); } 
    } 

然后,映射器接收您的 DBWritable 实现的实例作为其输入值。输入key是数据库提供的一个row id;您很可能会丢弃此值。

public class MyMapper extends MapReduceBase implements Mapper<LongWritable, MyRecord, LongWritable, Text> { 
public void map(LongWritable key, MyRecord val, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException { 
// Use val.id, val.name here 
output.collect(new LongWritable(val.id), new Text(val.name)); 
} 
} 

更多信息:阅读以下链接(我的答案的实际来源): http: //blog.cloudera.com/blog/2009/03/database-access-with-hadoop/

于 2012-12-13T19:42:31.867 回答