65

据我所理解;

  • sort by 仅在 reducer 中排序

  • order by 在全球范围内订购东西,但将所有东西都塞进一个减速器

  • cluster by 智能地通过 key hash 将内容分配到 reducer 中并进行排序

所以我的问题是集群是否保证全球秩序?分发方式将相同的键放入相同的减速器中,但相邻的键呢?

我可以找到的唯一文档是here,从示例中它似乎在全球范围内订购它们。但从定义上看,我觉得它并不总是那样做。

4

8 回答 8

168

更简短的回答:是的CLUSTER BY,只要您愿意自己加入多个输出文件,就可以保证全局排序。

更长的版本:

  • ORDER BY x: 保证全局排序,但是通过只通过一个 reducer 推送所有数据来做到这一点。对于大型数据集,这基本上是不可接受的。你最终得到一个排序的文件作为输出。
  • SORT BY x: 在 N 个 reducer 中的每一个上排序数据,但每个 reducer 可以接收重叠范围的数据。您最终会得到 N 个或更多具有重叠范围的排序文件。
  • DISTRIBUTE BY x: 确保 N 个 reducer 中的每一个都获得 的非重叠范围x,但不对每个 reducer 的输出进行排序。您最终会得到 N 个或更多具有非重叠范围的未排序文件。
  • CLUSTER BY x: 确保 N 个 reducer 中的每一个都获得不重叠的范围,然后在 reducer 上按这些范围进行排序。这为您提供全局排序,并且与执行 (DISTRIBUTE BY xSORT BY x) 相同。您最终会得到 N 个或更多具有非重叠范围的排序文件。

说得通?所以CLUSTER BY基本上是更具可扩展性的ORDER BY.

于 2013-09-17T01:09:05.527 回答
15

首先让我澄清一下:clustered by仅将您的密钥分配到不同的存储桶中,clustered by ... sorted by对存储桶进行排序。

通过一个简单的实验(见下文),您可以看到默认情况下您不会获得全局订单。原因是默认分区器使用哈希码拆分键,而不管实际键顺序如何。

但是,您可以获得完全有序的数据。

动机是 Tom White 的“Hadoop:权威指南”(第 3 版,第 8 章,第 274 页,Total Sort),他在其中讨论了 TotalOrderPartitioner。

我将首先回答您的 TotalOrdering 问题,然后描述我所做的几个与排序相关的 Hive 实验。

请记住:我在这里描述的是“概念证明”,我能够使用 Claudera 的 CDH3 发行版处理一个示例。

最初我希望 org.apache.hadoop.mapred.lib.TotalOrderPartitioner 能解决问题。不幸的是,它没有,因为它看起来像 Hive 的值分区,而不是键。所以我修补它(应该有子类,但我没有时间):

代替

public int getPartition(K key, V value, int numPartitions) {
    return partitions.findPartition(key);
}

public int getPartition(K key, V value, int numPartitions) {
    return partitions.findPartition(value);
}

现在您可以将(修补的)TotalOrderPartitioner 设置为您的 Hive 分区器:

hive> set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;

hive> set total.order.partitioner.natural.order=false

hive> set total.order.partitioner.path=/user/yevgen/out_data2

我也用过

hive> set hive.enforce.bucketing = true; 

hive> set mapred.reduce.tasks=4;

在我的测试中。

文件 out_data2 告诉 TotalOrderPartitioner 如何存储值。您通过对数据进行采样来生成 out_data2。在我的测试中,我使用了从 0 到 10 的 4 个存储桶和密钥。我使用 ad-hoc 方法生成了 out_data2:

import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.fs.FileSystem;


public class TotalPartitioner extends Configured implements Tool{
    public static void main(String[] args) throws Exception{
            ToolRunner.run(new TotalPartitioner(), args);
    }

    @Override
    public int run(String[] args) throws Exception {
        Path partFile = new Path("/home/yevgen/out_data2");
        FileSystem fs = FileSystem.getLocal(getConf());

        HiveKey key = new HiveKey();
        NullWritable value = NullWritable.get();

        SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), partFile, HiveKey.class, NullWritable.class);
        key.set( new byte[]{1,3}, 0, 2);//partition at 3; 1 came from Hive -- do not know why
        writer.append(key, value);
        key.set( new byte[]{1, 6}, 0, 2);//partition at 6
        writer.append(key, value);
        key.set( new byte[]{1, 9}, 0, 2);//partition at 9
        writer.append(key, value);
        writer.close();
        return 0;
    }

}

然后我将生成的 out_data2 复制到 HDFS(到 /user/yevgen/out_data2)

通过这些设置,我对数据进行了分桶/排序(请参阅我的实验列表中的最后一项)。

这是我的实验。

  • 创建样本数据

    bash> echo -e "1\n3\n2\n4\n5\n7\n6\n8\n9\n0" > data.txt

  • 创建基本测试表:

    蜂巢>创建表测试(x int);hive> 将数据本地路径“data.txt”加载到表测试中;

基本上这个表包含从 0 到 9 的值,没有顺序。

  • 演示表复制是如何工作的(实际上是 mapred.reduce.tasks 参数,它设置了要使用的最大减少任务数)

    hive> 创建表 test2(x int);

    蜂巢> 设置 mapred.reduce.tasks=4;

    hive> 插入覆盖表 test2 从测试 a 中选择 ax 连接测试 b on ax=bx; -- stupied join 强制非平凡的 map-reduce

    bash> hadoop fs -cat /user/hive/warehouse/test2/000001_0

    1

    5

    9

  • 展示分桶。您可以看到键是随机分配的,没有任何排序顺序:

    hive> 创建由 (x) 聚类的表 test3(x int) 成 4 个桶;

    hive> 设置 hive.enforce.bucketing = true;

    蜂巢>插入覆盖表test3从测试中选择*;

    bash> hadoop fs -cat /user/hive/warehouse/test3/000000_0

    4

    8

    0

  • 带排序的分桶。结果是部分排序的,不是完全排序的

    hive> create table test4(x int) clustered by (x) sorted by (x desc)分为4个桶;

    蜂巢>插入覆盖表test4从测试中选择*;

    bash> hadoop fs -cat /user/hive/warehouse/test4/000001_0

    1

    5

    9

您可以看到值按升序排序。看起来像 CDH3 中的 Hive 错误?

  • 在没有集群的情况下通过语句进行部分排序:

    hive> create table test5 as select x from test distribution by x sort by x desc;

    bash> hadoop fs -cat /user/hive/warehouse/test5/000001_0

    9

    5

    1

  • 使用我修补的 TotalOrderParitioner:

    hive> 设置 hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;

    蜂巢> 设置 total.order.partitioner.natural.order=false

    hive> 设置 total.order.partitioner.path=/user/training/out_data2

    hive> create table test6(x int) clustered by (x) sorted by (x)分为4个桶;

    蜂巢>插入覆盖表test6从测试中选择*;

    bash> hadoop fs -cat /user/hive/warehouse/test6/000000_0

    1

    2

    0

    bash> hadoop fs -cat /user/hive/warehouse/test6/000001_0

    3

    4

    5

    bash> hadoop fs -cat /user/hive/warehouse/test6/000002_0

    7

    6

    8

    bash> hadoop fs -cat /user/hive/warehouse/test6/000003_0

    9

于 2012-12-05T09:25:11.200 回答
10

CLUSTER BY 不产生全局排序。

接受的答案(由 Lars Yencken 撰写)通过声明减速器将接收非重叠范围来误导。正如 Anton Zaviriukhin 正确指向 BucketedTables 文档一样,CLUSTER BY 基本上是 DISTRIBUTE BY(与 bucketing 相同)加上每个 bucket/reducer 中的 SORT BY。并且 DISTRIBUTE BY 只是简单地将散列和 mods 放入桶中,虽然散列函数可以保留顺序(i 的散列 > j 的散列,如果 i > j),但散列值的 mod 不会。

这是显示重叠范围的更好示例

http://myitlearnings.com/bucketing-in-hive/

于 2016-08-04T15:59:38.757 回答
5

据我了解,简短的回答是否定的。你会得到重叠的范围。

来自SortBy 文档:“Cluster By 是 Distribute By 和 Sort By 的捷径。” “具有相同 Distribute By 列的所有行都将进入同一个 reducer。” 但是没有信息可以通过保证不重叠的范围来分发。

此外,来自DDL BucketedTables 文档:“Hive 如何在存储桶之间分配行?通常,存储桶编号由表达式 hash_function(bucketing_column) mod num_buckets 确定。” 我想 Cluster by in Select 语句使用相同的原则在减速器之间分配行,因为它的主要用途是用数据填充分桶表。

我创建了一个包含 1 个整数列“a”的表,并在那里插入了从 0 到 9 的数字。

然后我将减速器的数量设置为 2 set mapred.reduce.tasks = 2;

并且select该表中的数据带有Cluster by子句 select * from my_tab cluster by a;

并收到了我预期的结果:

0
2
4
6
8
1
3
5
7
9

所以,第一个减速器(数字 0)得到偶数(因为它们的模式 2 给出 0)

第二个减速器(编号 1)得到奇数(因为它们的模式 2 给出 1)

这就是“分发方式”的工作原理。

然后“排序依据”对每个减速器内的结果进行排序。

于 2014-07-25T15:43:44.633 回答
2

用例:当有一个大数据集时,应该像 sort by 那样进行排序,所有集合 reducer 在合并之前在内部对数据进行排序,从而提高性能。在按顺序排列时,较大数据集的性能会降低,因为所有数据都通过单个减速器传递,这会增加负载并因此需要更长的时间来执行查询。请参见下面关于 11 节点集群的示例。 在此处输入图像描述

这是 Order By 示例输出在此处输入图像描述

这是排序方式示例输出在此处输入图像描述

这个是 Cluster By example在此处输入图像描述

我观察到,sort by、cluster by和distribute by的数字是相同的,但内部机制不同。在 DISTRIBUTE BY 中:相同的列行将转到一个 reducer,例如。DISTRIBUTE BY(City) - 一列中的班加罗尔数据,一个减速器中的德里数据:

在此处输入图像描述

于 2019-03-11T07:52:55.750 回答
0

SortBy:N个或更多具有重叠范围的排序文件。

OrderBy:单输出,即完全排序。

分发方式:通过保护 N 个减速器中的每一个来分发,以获得列的非重叠范围,但不对每个减速器的输出进行排序。

欲了解更多信息http://commandstech.com/hive-sortby-vs-orderby-vs-distributeby-vs-clusterby/

ClusterBy:参考上面同样的例子,如果我们使用Cluster By x,两个reducer会进一步对x上的行进行排序:

于 2019-02-02T16:17:36.940 回答
0

如果我理解正确

1.sort by - 只对reducer内的数据进行排序

2.order by - 通过将整个数据集推送到单个减速器来全局排序。如果我们确实有很多数据(倾斜),这个过程将花费很多时间。

  1. cluster by - 通过 key hash 智能地将内容分配到 reducer 并进行排序,但不授予全局排序。一个 key(k1) 可以放入两个 reducer。第一个 reducer 获得 10K K1 数据,第二个可能获得 1K k1 数据。
于 2021-07-29T05:37:06.830 回答
0

Cluster by 是按 reducer 排序的,不是全局的。在许多书中,它也被错误地或混淆地提及。它有特殊用途,例如您将每个部门分配给特定的减速器,然后按每个部门中的员工姓名排序,并且不关心要使用的集群的部门顺序,并且由于工作负载分布在减速器之间,它的性能更高.

于 2017-04-19T14:25:56.350 回答