据我所理解;
sort by 仅在 reducer 中排序
order by 在全球范围内订购东西,但将所有东西都塞进一个减速器
cluster by 智能地通过 key hash 将内容分配到 reducer 中并进行排序
所以我的问题是集群是否保证全球秩序?分发方式将相同的键放入相同的减速器中,但相邻的键呢?
我可以找到的唯一文档是here,从示例中它似乎在全球范围内订购它们。但从定义上看,我觉得它并不总是那样做。
更简短的回答:是的CLUSTER BY
,只要您愿意自己加入多个输出文件,就可以保证全局排序。
更长的版本:
ORDER BY x
: 保证全局排序,但是通过只通过一个 reducer 推送所有数据来做到这一点。对于大型数据集,这基本上是不可接受的。你最终得到一个排序的文件作为输出。SORT BY x
: 在 N 个 reducer 中的每一个上排序数据,但每个 reducer 可以接收重叠范围的数据。您最终会得到 N 个或更多具有重叠范围的排序文件。DISTRIBUTE BY x
: 确保 N 个 reducer 中的每一个都获得 的非重叠范围x
,但不对每个 reducer 的输出进行排序。您最终会得到 N 个或更多具有非重叠范围的未排序文件。CLUSTER BY x
: 确保 N 个 reducer 中的每一个都获得不重叠的范围,然后在 reducer 上按这些范围进行排序。这为您提供全局排序,并且与执行 (DISTRIBUTE BY x
和SORT BY x
) 相同。您最终会得到 N 个或更多具有非重叠范围的排序文件。说得通?所以CLUSTER BY
基本上是更具可扩展性的ORDER BY
.
首先让我澄清一下:clustered by
仅将您的密钥分配到不同的存储桶中,clustered by ... sorted by
对存储桶进行排序。
通过一个简单的实验(见下文),您可以看到默认情况下您不会获得全局订单。原因是默认分区器使用哈希码拆分键,而不管实际键顺序如何。
但是,您可以获得完全有序的数据。
动机是 Tom White 的“Hadoop:权威指南”(第 3 版,第 8 章,第 274 页,Total Sort),他在其中讨论了 TotalOrderPartitioner。
我将首先回答您的 TotalOrdering 问题,然后描述我所做的几个与排序相关的 Hive 实验。
请记住:我在这里描述的是“概念证明”,我能够使用 Claudera 的 CDH3 发行版处理一个示例。
最初我希望 org.apache.hadoop.mapred.lib.TotalOrderPartitioner 能解决问题。不幸的是,它没有,因为它看起来像 Hive 的值分区,而不是键。所以我修补它(应该有子类,但我没有时间):
代替
public int getPartition(K key, V value, int numPartitions) {
return partitions.findPartition(key);
}
和
public int getPartition(K key, V value, int numPartitions) {
return partitions.findPartition(value);
}
现在您可以将(修补的)TotalOrderPartitioner 设置为您的 Hive 分区器:
hive> set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
hive> set total.order.partitioner.natural.order=false
hive> set total.order.partitioner.path=/user/yevgen/out_data2
我也用过
hive> set hive.enforce.bucketing = true;
hive> set mapred.reduce.tasks=4;
在我的测试中。
文件 out_data2 告诉 TotalOrderPartitioner 如何存储值。您通过对数据进行采样来生成 out_data2。在我的测试中,我使用了从 0 到 10 的 4 个存储桶和密钥。我使用 ad-hoc 方法生成了 out_data2:
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.fs.FileSystem;
public class TotalPartitioner extends Configured implements Tool{
public static void main(String[] args) throws Exception{
ToolRunner.run(new TotalPartitioner(), args);
}
@Override
public int run(String[] args) throws Exception {
Path partFile = new Path("/home/yevgen/out_data2");
FileSystem fs = FileSystem.getLocal(getConf());
HiveKey key = new HiveKey();
NullWritable value = NullWritable.get();
SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), partFile, HiveKey.class, NullWritable.class);
key.set( new byte[]{1,3}, 0, 2);//partition at 3; 1 came from Hive -- do not know why
writer.append(key, value);
key.set( new byte[]{1, 6}, 0, 2);//partition at 6
writer.append(key, value);
key.set( new byte[]{1, 9}, 0, 2);//partition at 9
writer.append(key, value);
writer.close();
return 0;
}
}
然后我将生成的 out_data2 复制到 HDFS(到 /user/yevgen/out_data2)
通过这些设置,我对数据进行了分桶/排序(请参阅我的实验列表中的最后一项)。
这是我的实验。
创建样本数据
bash> echo -e "1\n3\n2\n4\n5\n7\n6\n8\n9\n0" > data.txt
创建基本测试表:
蜂巢>创建表测试(x int);hive> 将数据本地路径“data.txt”加载到表测试中;
基本上这个表包含从 0 到 9 的值,没有顺序。
演示表复制是如何工作的(实际上是 mapred.reduce.tasks 参数,它设置了要使用的最大减少任务数)
hive> 创建表 test2(x int);
蜂巢> 设置 mapred.reduce.tasks=4;
hive> 插入覆盖表 test2 从测试 a 中选择 ax 连接测试 b on ax=bx; -- stupied join 强制非平凡的 map-reduce
bash> hadoop fs -cat /user/hive/warehouse/test2/000001_0
1
5
9
展示分桶。您可以看到键是随机分配的,没有任何排序顺序:
hive> 创建由 (x) 聚类的表 test3(x int) 成 4 个桶;
hive> 设置 hive.enforce.bucketing = true;
蜂巢>插入覆盖表test3从测试中选择*;
bash> hadoop fs -cat /user/hive/warehouse/test3/000000_0
4
8
0
带排序的分桶。结果是部分排序的,不是完全排序的
hive> create table test4(x int) clustered by (x) sorted by (x desc)分为4个桶;
蜂巢>插入覆盖表test4从测试中选择*;
bash> hadoop fs -cat /user/hive/warehouse/test4/000001_0
1
5
9
您可以看到值按升序排序。看起来像 CDH3 中的 Hive 错误?
在没有集群的情况下通过语句进行部分排序:
hive> create table test5 as select x from test distribution by x sort by x desc;
bash> hadoop fs -cat /user/hive/warehouse/test5/000001_0
9
5
1
使用我修补的 TotalOrderParitioner:
hive> 设置 hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
蜂巢> 设置 total.order.partitioner.natural.order=false
hive> 设置 total.order.partitioner.path=/user/training/out_data2
hive> create table test6(x int) clustered by (x) sorted by (x)分为4个桶;
蜂巢>插入覆盖表test6从测试中选择*;
bash> hadoop fs -cat /user/hive/warehouse/test6/000000_0
1
2
0
bash> hadoop fs -cat /user/hive/warehouse/test6/000001_0
3
4
5
bash> hadoop fs -cat /user/hive/warehouse/test6/000002_0
7
6
8
bash> hadoop fs -cat /user/hive/warehouse/test6/000003_0
9
CLUSTER BY 不产生全局排序。
接受的答案(由 Lars Yencken 撰写)通过声明减速器将接收非重叠范围来误导。正如 Anton Zaviriukhin 正确指向 BucketedTables 文档一样,CLUSTER BY 基本上是 DISTRIBUTE BY(与 bucketing 相同)加上每个 bucket/reducer 中的 SORT BY。并且 DISTRIBUTE BY 只是简单地将散列和 mods 放入桶中,虽然散列函数可以保留顺序(i 的散列 > j 的散列,如果 i > j),但散列值的 mod 不会。
这是显示重叠范围的更好示例
据我了解,简短的回答是否定的。你会得到重叠的范围。
来自SortBy 文档:“Cluster By 是 Distribute By 和 Sort By 的捷径。” “具有相同 Distribute By 列的所有行都将进入同一个 reducer。” 但是没有信息可以通过保证不重叠的范围来分发。
此外,来自DDL BucketedTables 文档:“Hive 如何在存储桶之间分配行?通常,存储桶编号由表达式 hash_function(bucketing_column) mod num_buckets 确定。” 我想 Cluster by in Select 语句使用相同的原则在减速器之间分配行,因为它的主要用途是用数据填充分桶表。
我创建了一个包含 1 个整数列“a”的表,并在那里插入了从 0 到 9 的数字。
然后我将减速器的数量设置为 2
set mapred.reduce.tasks = 2;
。
并且select
该表中的数据带有Cluster by
子句
select * from my_tab cluster by a;
并收到了我预期的结果:
0
2
4
6
8
1
3
5
7
9
所以,第一个减速器(数字 0)得到偶数(因为它们的模式 2 给出 0)
第二个减速器(编号 1)得到奇数(因为它们的模式 2 给出 1)
这就是“分发方式”的工作原理。
然后“排序依据”对每个减速器内的结果进行排序。
用例:当有一个大数据集时,应该像 sort by 那样进行排序,所有集合 reducer 在合并之前在内部对数据进行排序,从而提高性能。在按顺序排列时,较大数据集的性能会降低,因为所有数据都通过单个减速器传递,这会增加负载并因此需要更长的时间来执行查询。请参见下面关于 11 节点集群的示例。
我观察到,sort by、cluster by和distribute by的数字是相同的,但内部机制不同。在 DISTRIBUTE BY 中:相同的列行将转到一个 reducer,例如。DISTRIBUTE BY(City) - 一列中的班加罗尔数据,一个减速器中的德里数据:
SortBy:N个或更多具有重叠范围的排序文件。
OrderBy:单输出,即完全排序。
分发方式:通过保护 N 个减速器中的每一个来分发,以获得列的非重叠范围,但不对每个减速器的输出进行排序。
欲了解更多信息http://commandstech.com/hive-sortby-vs-orderby-vs-distributeby-vs-clusterby/
ClusterBy:参考上面同样的例子,如果我们使用Cluster By x,两个reducer会进一步对x上的行进行排序:
如果我理解正确
1.sort by - 只对reducer内的数据进行排序
2.order by - 通过将整个数据集推送到单个减速器来全局排序。如果我们确实有很多数据(倾斜),这个过程将花费很多时间。
Cluster by 是按 reducer 排序的,不是全局的。在许多书中,它也被错误地或混淆地提及。它有特殊用途,例如您将每个部门分配给特定的减速器,然后按每个部门中的员工姓名排序,并且不关心要使用的集群的部门顺序,并且由于工作负载分布在减速器之间,它的性能更高.