16

最近我打算将我的独立 python ML 代码迁移到 spark。事实证明,ML 管道spark.ml非常方便,具有用于链接算法阶段和超参数网格搜索的流线型 API。

尽管如此,我发现它对现有文档中的一项重要功能的支持是模糊的:缓存中间结果。当流水线涉及计算密集阶段时,此功能的重要性就显现出来了。

例如,在我的例子中,我使用一个巨大的稀疏矩阵对时间序列数据执行多个移动平均值,以形成输入特征。矩阵的结构由一些超参数决定。这一步结果成为整个管道的瓶颈,因为我必须在运行时构造矩阵。

在参数搜索期间,除了这个“结构参数”之外,我通常还有其他参数要检查。因此,如果我可以在“结构参数”不变的情况下重用巨大的矩阵,我可以节省大量时间。出于这个原因,我特意编写了代码来缓存和重用这些中间结果。

所以我的问题是:Spark 的 ML 管道可以自动处理中间缓存吗?还是我必须手动形成代码才能这样做?如果是这样,是否有任何最佳实践可供学习?

PS我查看了官方文档和其他一些材料,但似乎没有一个讨论这个话题。

4

1 回答 1

11

所以我遇到了同样的问题,我解决的方法是我实现了自己的 PipelineStage,它缓存输入 DataSet 并按原样返回。

import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.types.StructType

class Cacher(val uid: String) extends Transformer with DefaultParamsWritable {
  override def transform(dataset: Dataset[_]): DataFrame = dataset.toDF.cache()

  override def copy(extra: ParamMap): Transformer = defaultCopy(extra)

  override def transformSchema(schema: StructType): StructType = schema

  def this() = this(Identifiable.randomUID("CacherTransformer"))
}

要使用它,您将执行以下操作:

new Pipeline().setStages(Array(stage1, new Cacher(), stage2))
于 2016-09-01T15:05:20.113 回答