在上述案例(数据库)中如何创建 InputSplits?
// Split the rows into n-number of chunks and adjust the last chunk
// accordingly
for (int i = 0; i < chunks; i++) {
DBInputSplit split;
if ((i + 1) == chunks)
split = new DBInputSplit(i * chunkSize, count);
else
split = new DBInputSplit(i * chunkSize, (i * chunkSize)
+ chunkSize);
splits.add(split);
}
有方法,但要了解它取决于什么,让我们看一下 chunkSize:
statement = connection.createStatement();
results = statement.executeQuery(getCountQuery());
results.next();
long count = results.getLong(1);
int chunks = job.getConfiguration().getInt("mapred.map.tasks", 1);
long chunkSize = (count / chunks);
所以 chunkSize 采用 count =SELECT COUNT(*) FROM tableName并将其除以 chunks =mapred.map.tasks或 1 如果未在配置中定义。
最后,每个输入拆分都将RecordReader创建一个来处理您正在读取的数据库类型,例如:MySQLDBRecordReader对于 MySQL 数据库。
有关更多信息,请查看来源