我正在研究 SparkSQL。我使用 JavaPairRDD 从 HBase 获取数据,然后做了一个映射。在地图中,我将所有键保存到一个集合中。为了强制完成此映射,请遵循 collect()。在此之后,我使用 Set 中的值进行其他操作。
该程序可以在我的本地 PC 上完美运行。但是当我把它放到集群上(2个工人)时,就会出现执行障碍。在映射转换之前,执行 Set 操作。
代码流程如下: 从 Hbase 获取数据:
JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc.newAPIHadoopRDD(hbase_conf,
TableInputFormat.class, ImmutableBytesWritable.class,
Result.class);
转换数据:
JavaRDD<Map<String, String>> data = hBaseRDD.map(
new Function<Tuple2<ImmutableBytesWritable, Result>, Map<String, String>>(){
public Map<String, String> call(
Tuple2<ImmutableBytesWritable, Result> re)
throws Exception {
byte[] payload =re._2().getValue(Bytes.toBytes("ContentInfo"), Bytes.toBytes("Payload"));
Map<String, String> map = new ConcurrentHashMap<String, String>();
String primaryKey = new String(re._1().get());
map.put("primaryKey", primaryKey);
if(payload != null)
map.put("payload", new String(payload));
Map<byte[], byte[]> tmpMetaMap = re._2().getFamilyMap(Bytes.toBytes("MetaInfo"));
if(tmpMetaMap != null){
for(Entry<byte[], byte[]> entry : tmpMetaMap.entrySet()){
String tmpKey = Bytes.toString(entry.getKey());
String tmpValue = Bytes.toString(entry.getValue());
map.put(tmpKey, tmpValue);
//save result to the set
keySet.add(tmpKey);
}
}
return map;
}
});
强制上面的地图运行:
data.collect();
获取 Set 的结果:
StringBuilder sb = new StringBuilder();
for(String fieldName: keySet){
sb.append(fieldName).append(",");
}
当我在本地运行代码时,我可以获得所有结果。但是当我在集群上运行它时, sb 没有任何价值。