0

我有一种情况,当给定批处理时,底层函数的运行效率显着提高。我有这样的现有代码:

// subjects: RDD[Subject]
val subjects = Subject.load(job, sparkContext, config)
val classifications = subjects.flatMap(subject => classify(subject)).reduceByKey(_ + _)
classifications.saveAsTextFile(config.output)

classify方法适用于单个元素,但对元素组进行操作会更有效。我考虑使用coalesce将 RDD 拆分为块并将每个块作为一个组进行操作,但是这样做有两个问题:

  1. 我不确定如何返回映射的 RDD。
  2. classify事先不知道组应该有多大,并且根据输入的内容而有所不同。

关于如何在理想情况下调用的示例代码classify(输出很混乱,因为它不会溢出非常大的输入):

def classifyRdd (subjects: RDD[Subject]): RDD[(String, Long)] = {
  val classifier = new Classifier
  subjects.foreach(subject => classifier.classifyInBatches(subject))
  classifier.classifyRemaining
  classifier.results
}

这种方式classifyInBatches可以在内部有这样的代码:

def classifyInBatches(subject: Subject) {
  if (!internals.canAdd(subject)) {
    partialResults.add(internals.processExisting)
  }
  internals.add(subject) // Assumption: at least one will fit.
}

我可以在 Apache Spark 中做什么来允许类似这样的行为?

4

1 回答 1

2

尝试使用该mapPartitions方法,该方法允许您的 map 函数将分区用作迭代器并生成输出迭代器。

您应该能够编写如下内容:

subjectsRDD.mapPartitions { subjects =>
  val classifier = new Classifier
  subjects.foreach(subject => classifier.classifyInBatches(subject))
  classifier.classifyRemaining
  classifier.results
}
于 2014-07-22T22:18:29.357 回答