我正在处理 hadoop 中的输入日志文件,其中键分布不均匀。这意味着减速器的值分布不均匀。例如 key1 有 1 个值,而 key2 有 1000 个值。
有什么方法可以对与同一个键关联的值进行负载平衡[我也不想修改我的键]
我正在处理 hadoop 中的输入日志文件,其中键分布不均匀。这意味着减速器的值分布不均匀。例如 key1 有 1 个值,而 key2 有 1000 个值。
有什么方法可以对与同一个键关联的值进行负载平衡[我也不想修改我的键]
也许您可以在使用减速器之前使用组合器?这是相当投机的...
想法是将每组键划分为预设最大大小的分区,然后将这些分区的 k/v 对输出到 reducer。此代码假定您已在配置中的某处设置了该大小。
public static class myCombiner extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
List<Text> textList = new ArrayList<Text>();
int part = 0;
while (values.iterator().hasNext()) {
if (textList.size() <= Integer.parseInt(context.getConfiguration().get("yourMaxSize"))) {
textList.add(values.iterator().next());
} else {
for(Text t : textList) {
//essentially partitioning each key...
context.write(new Text(key.toString() + "_" + Integer.toString(part)), t);
}
textList.clear();
}
part += 1;
}
//output any stragglers ...
for(Text t : textList) {
context.write(new Text(key.toString() + "_" + Integer.toString(part)), t);
}
}
}
如果您知道哪些键将具有异常大量的值,则可以使用以下技巧。
您可以实现一个自定义Partitioner
,以确保您的每个倾斜键都进入一个分区,然后其他所有内容将由它们分配到其余分区hashCode
(这是默认值HashPartitioner
)。
Partitioner
您可以通过实现此接口来创建自定义:
public interface Partitioner<K, V> extends JobConfigurable {
int getPartition(K key, V value, int numPartitions);
}
然后你可以告诉 Hadoop 使用你Partitioner
的:
conf.setPartitionerClass(CustomPartitioner.class);