1

我写了一个火花作业。如下所示:

public class TestClass {

public static void main(String[] args){
String masterIp = args[0];
String appName = args[1];
String inputFile = args[2];
String output = args[3];
SparkConf conf = new SparkConf().setMaster(masterIp).setAppName(appName);
JavaSparkContext sparkContext = new JavaSparkContext(conf);
JavaRDD<String> rdd = sparkContext.textFile(inputFile);
Integer[] keyColumns = new Integer[] {0,1,2};
Broadcast<Integer[]> broadcastJob = sparkContext.broadcast(keyColumns);

Function<Integer,Long> createCombiner = v1 -> Long.valueOf(v1);
Function2<Long, Integer, Long> mergeValue = (v1,v2) -> v1+v2;
Function2<Long, Long, Long> mergeCombiners = (v1,v2) -> v1+v2;

JavaPairRDD<String, Long> pairRDD = rdd.mapToPair(new PairFunction<String, String, Integer>() {
      private static final long serialVersionUID = -6293440291696487370L;
      @Override
      public Tuple2<String, Integer> call(String t) throws Exception {
        String[] record = t.split(",");
        Integer[] keyColumns = broadcastJob.value();
        StringBuilder key = new StringBuilder();
        for (int index = 0; index < keyColumns.length; index++) {
          key.append(record[keyColumns[index]]);
        }
        key.append("|id=1");
        Integer value = new Integer(record[4]);
        return new Tuple2<String, Integer>(key.toString(),value);
      }}).combineByKey(createCombiner, mergeValue, mergeCombiners).reduceByKey((v1,v2) -> v1+v2);
      pairRDD.saveAsTextFile(output);
   }
}

该程序计算每个键的值的总和。根据我的理解,本地组合器应该在每个节点上运行并将相同键的值相加,然后在少量数据的情况下进行混洗。但在 SparkUI 上,它显示了大量的随机读取和随机写入(几乎 58GB)。我做错什么了吗?如何知道本地合路器是否工作?

集群详细信息:-
20 个节点集群
每个节点具有 80GB 硬盘、8GB RAM、4 核
Hadoop-2.7.2
Spark-2.0.2(prebuild-with-Hadoop-2.7.x 分发版)

输入文件详细信息:-
输入文件存储在 hdfs
输入文件大小:400GB
记录数:16,129,999,990
记录列:String(2 char),int,int,String(2 char),int,int,String(2 char),字符串(2 个字符),字符串(2 个字符)

注意:最大不同键数为 1081600。
在 spark 日志中,我看到任务以 localitylevel NODE_LOCAL 运行。

在此处输入图像描述

4

1 回答 1

0

让我们分解这个问题,看看得到了什么。为了简化计算,我们假设:

  • 总记录数为 1.6e8
  • 唯一键数为 1e6
  • 拆分大小为 128MB(这似乎与您 UI 中的任务数一致)。

使用这些值,数据将被分配到 ~3200 个分区(在您的情况下为 3125 个)。这为您提供了每个拆分大约 51200 条记录。此外,如果每个键的值数量分布是均匀的,那么每个键平均应该有大约 160 条记录。

如果数据是随机分布的(例如,它不是按键排序的),您可以预期每个分区每个键的平均记录数将接近一个*。这基本上是最坏的情况,地图端合并根本不会减少数据量。

此外,您必须记住,平面文件的大小通常会明显低于序列化对象的大小。

对于现实生活中的数据,您通常可以期望从数据收集过程中出现某种类型的顺序,因此事情应该比我们上面计算的要好,但底线是,如果数据尚未按分区分组,则地图侧组合可能不会提供任何改进一点也不。

您可能可以通过使用更大的拆分来减少洗牌的数据量(256MB 会给您一个超过 100K 的分区),但它的代价是更长的 GC 暂停和可能的其他 GC 问题。


* 您可以通过更换样本来模拟这一点:

import pandas as pd
import numpy as np

(pd
    .DataFrame({"x": np.random.choice(np.arange(3200), size=160, replace=True)})
    .groupby("x")
    .x.count()
    .mean())

或者只是想想随机分配160个球到3200个桶的问题。

于 2017-03-08T14:49:38.043 回答