1

我正在研究 SparkSQL。我使用 JavaPairRDD 从 HBase 获取数据,然后做了一个映射。在地图中,我将所有键保存到一个集合中。为了强制完成此映射,请遵循 collect()。在此之后,我使用 Set 中的值进行其他操作。

该程序可以在我的本地 PC 上完美运行。但是当我把它放到集群上(2个工人)时,就会出现执行障碍。在映射转换之前,执行 Set 操作。

代码流程如下: 从 Hbase 获取数据:

JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =     jsc.newAPIHadoopRDD(hbase_conf,
                TableInputFormat.class, ImmutableBytesWritable.class,
                Result.class);

转换数据:

JavaRDD<Map<String, String>> data = hBaseRDD.map(
                new Function<Tuple2<ImmutableBytesWritable, Result>, Map<String, String>>(){
                    public Map<String, String> call(
                            Tuple2<ImmutableBytesWritable, Result> re)
                            throws Exception {
                        byte[] payload =re._2().getValue(Bytes.toBytes("ContentInfo"), Bytes.toBytes("Payload"));
                        Map<String, String> map = new ConcurrentHashMap<String, String>();

                        String primaryKey = new String(re._1().get());
                        map.put("primaryKey", primaryKey);

                        if(payload != null)
                            map.put("payload", new String(payload));

                        Map<byte[], byte[]> tmpMetaMap = re._2().getFamilyMap(Bytes.toBytes("MetaInfo"));
                        if(tmpMetaMap != null){
                            for(Entry<byte[], byte[]> entry : tmpMetaMap.entrySet()){

                                String tmpKey = Bytes.toString(entry.getKey());
                                String tmpValue = Bytes.toString(entry.getValue());

                                map.put(tmpKey, tmpValue);
    //save result to the set
                                keySet.add(tmpKey);
                            }
                        }
                        return map;
                    }
                });

强制上面的地图运行:

data.collect();

获取 Set 的结果:

StringBuilder sb = new StringBuilder();

        for(String fieldName: keySet){

            sb.append(fieldName).append(",");
        }

当我在本地运行代码时,我可以获得所有结果。但是当我在集群上运行它时, sb 没有任何价值。

4

2 回答 2

0

How you have defined keyset? Try to define it as static or otherwise use foreach instead of map which will bring all the data to the DriverSide.Hope this answer your question

于 2014-12-02T18:54:24.263 回答
0

此问题与操作的顺序无关,而是与此类操作集群中发生的位置有关。

在 Spark 中,有两种类型的操作:转换和操作。

转换通过对内容应用一些函数将 RDD 转换为另一个 RDD。这是一种纯函数式方法,没有副作用。动作采用 RDD 并产生其他东西,例如文件或本地数据结构:这些操作将 RDD 的数据具体化为其他形式。

在这种情况下,转换函数:map正在使用副作用,因为keyset预计在地图转换期间会发生变异。鉴于它keyset是在转换函数范围之外定义的,它将被序列化并发送给执行程序,但是远程发生的任何突变都不会在驱动程序中恢复。

如果我们考虑一下,每个执行程序都将在数据分区上应用转换,因此无论“keyset”以什么结尾,都将只是每个分区的部分视图。

对此进行建模的正确方法是根据 RDD 转换和操作重新定义操作。

从上面的代码中,看起来我们想要将一些输入转换为一个RDD[Map[String,String]],并且我们有兴趣在驱动程序上收集所有条目中的键集,这些条目不是来自该结果的“主键”和“有效负载”。

在 Spark 中,这可能类似于:

// data = RDD[Map[String, String]]
// first we get all the keys from all the maps
val keys = data.map{entry => entry.keys}
// now we collect that information on the driver
val allKeys = keys.collect
// we transform the resulting array into a set - this will remove duplicates by definition
val allKeySet = allKeys.toSet
// We need still to remove "primaryKey" and "payload"
val keySet = fullKeySet.diff(Set("primaryKey","payload"))

在 Java 中,代码有点冗长,但结构和思想是相同的。

于 2014-12-03T00:57:03.720 回答