我有一种情况,当给定批处理时,底层函数的运行效率显着提高。我有这样的现有代码:
// subjects: RDD[Subject]
val subjects = Subject.load(job, sparkContext, config)
val classifications = subjects.flatMap(subject => classify(subject)).reduceByKey(_ + _)
classifications.saveAsTextFile(config.output)
该classify
方法适用于单个元素,但对元素组进行操作会更有效。我考虑使用coalesce
将 RDD 拆分为块并将每个块作为一个组进行操作,但是这样做有两个问题:
- 我不确定如何返回映射的 RDD。
classify
事先不知道组应该有多大,并且根据输入的内容而有所不同。
关于如何在理想情况下调用的示例代码classify
(输出很混乱,因为它不会溢出非常大的输入):
def classifyRdd (subjects: RDD[Subject]): RDD[(String, Long)] = {
val classifier = new Classifier
subjects.foreach(subject => classifier.classifyInBatches(subject))
classifier.classifyRemaining
classifier.results
}
这种方式classifyInBatches
可以在内部有这样的代码:
def classifyInBatches(subject: Subject) {
if (!internals.canAdd(subject)) {
partialResults.add(internals.processExisting)
}
internals.add(subject) // Assumption: at least one will fit.
}
我可以在 Apache Spark 中做什么来允许类似这样的行为?