我正在尝试从 Spark RDD 创建 MapFile,但找不到足够的信息。到目前为止,这是我的步骤:
我开始时,
rdd.saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)
这引发了一个异常,因为MapFiles
必须对其进行排序。所以我修改为:
rdd.sortByKey().saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)
效果很好,并且创建了我的 MapFile。所以下一步是访问文件。使用parts
创建的目录名称失败,说它找不到data
文件。回到谷歌,我发现为了访问MapFile
我需要使用的部分:
Object ret = new Object();//My actual WritableComparable impl
Reader[] readers = MapFileOutputFormat.getReaders(new Path(file), new Configuration());
Partitioner<K,V> p = new HashPartitioner<>();
Writable e = MapFileOutputFormat.getEntry(readers, p key, ret);
天真地,我忽略了这HashPartioner
一点,并期望这会找到我的条目,但没有运气。所以我的下一步是遍历读者并做一个get(..)
. 该解决方案确实有效,但速度极慢,因为文件由 128 个任务创建,导致 128 个part
文件。
所以我调查了它的重要性,HashPartitioner
发现它在内部使用它来识别要使用哪个阅读器,但似乎 Spark 没有使用相同的分区逻辑。所以我修改为:
rdd.partitionBy(new org.apache.spark.HashPartitioner(128)).sortByKey().saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)
但同样 2HashPartioner
不匹配。所以问题部分...
- 有没有办法
MapFiles
有效地组合(因为这会忽略分区逻辑)? MapFileOutputFormat.getReaders(new Path(file), new Configuration());
很慢。我可以更有效地识别读者吗?- 我使用 MapR-FS 作为底层 DFS。这将使用相同的
HashParitioner
实现吗? - 有没有办法避免重新分区,或者应该对整个文件进行排序?(与在分区内排序相反)
- 我也遇到了一个例外
_SUCCESS/data does not exist
。我需要手动删除这个文件吗?
任何有关此的链接将不胜感激。
PS。如果条目已排序,那么如何使用 HashPartitioner
来定位正确的Reader
?这意味着数据parts
是Hash Partitioned
然后按键排序的。所以我也尝试rdd.repartiotionAndSortWithinPartitions(new HashPartitioner(280))
了,但再次没有任何运气。