11

我试图创建和保存带有自定义阶段的管道。我需要使用 a添加column到我的。因此,我想知道是否可以将 a或类似动作转换为?DataFrameUDFUDFTransformer

我的自定义UDF看起来像这样,我想学习如何使用UDFas a custom来做到这一点Transformer

def getFeatures(n: String) = {
    val NUMBER_FEATURES = 4  
    val name = n.split(" +")(0).toLowerCase
    ((1 to NUMBER_FEATURES)
         .filter(size => size <= name.length)
         .map(size => name.substring(name.length - size)))
} 

val tokenizeUDF = sqlContext.udf.register("tokenize", (name: String) => getFeatures(name))
4

3 回答 3

15

它不是一个功能齐全的解决方案,但您可以从以下内容开始:

import org.apache.spark.ml.{UnaryTransformer}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{ArrayType, DataType, StringType}

class NGramTokenizer(override val uid: String)
  extends UnaryTransformer[String, Seq[String], NGramTokenizer]  {

  def this() = this(Identifiable.randomUID("ngramtokenizer"))

  override protected def createTransformFunc: String => Seq[String] = {
    getFeatures _
  }

  override protected def validateInputType(inputType: DataType): Unit = {
    require(inputType == StringType)
  }

  override protected def outputDataType: DataType = {
    new ArrayType(StringType, true)
  }
}

快速检查:

val df = Seq((1L, "abcdef"), (2L, "foobar")).toDF("k", "v")
val transformer = new NGramTokenizer().setInputCol("v").setOutputCol("vs")

transformer.transform(df).show
// +---+------+------------------+
// |  k|     v|                vs|
// +---+------+------------------+
// |  1|abcdef|[f, ef, def, cdef]|
// |  2|foobar|[r, ar, bar, obar]|
// +---+------+------------------+

您甚至可以尝试将其概括为以下内容:

import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
import scala.reflect.runtime.universe._

class UnaryUDFTransformer[T : TypeTag, U : TypeTag](
  override val uid: String,
  f: T => U
) extends UnaryTransformer[T, U, UnaryUDFTransformer[T, U]]  {

  override protected def createTransformFunc: T => U = f

  override protected def validateInputType(inputType: DataType): Unit = 
    require(inputType == schemaFor[T].dataType)

  override protected def outputDataType: DataType = schemaFor[U].dataType
}

val transformer = new UnaryUDFTransformer("featurize", getFeatures)
  .setInputCol("v")
  .setOutputCol("vs")

如果您想使用 UDF 而不是包装函数,则必须Transformer直接扩展并覆盖transform方法。不幸的是,大多数有用的类都是私有的,所以它可能相当棘手。

或者,您可以注册 UDF:

spark.udf.register("getFeatures", getFeatures _)

并使用SQLTransformer

import org.apache.spark.ml.feature.SQLTransformer

val transformer = new SQLTransformer()
  .setStatement("SELECT *, getFeatures(v) AS vs FROM __THIS__")

transformer.transform(df).show
// +---+------+------------------+
// |  k|     v|                vs|
// +---+------+------------------+
// |  1|abcdef|[f, ef, def, cdef]|
// |  2|foobar|[r, ar, bar, obar]|
// +---+------+------------------+
于 2016-02-03T17:20:31.473 回答
1

我最初尝试扩展TransformerUnaryTransformer抽象,但遇到了我的应用程序无法访问DefaultParamsWriteable的问题。作为可能与您的问题相关的示例,我创建了一个简单的术语规范化器作为 UDF 遵循此示例。我的目标是将术语与模式和集合进行匹配,以用通用术语替换它们。例如:

"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b".r -> "emailaddr"

这是课

import scala.util.matching.Regex

class TermNormalizer(normMap: Map[Any, String]) {
  val normalizationMap = normMap

  def normalizeTerms(terms: Seq[String]): Seq[String] = {
    var termsUpdated = terms
    for ((term, idx) <- termsUpdated.view.zipWithIndex) {
      for (normalizer <- normalizationMap.keys: Iterable[Any]) {
        normalizer match {
          case (regex: Regex) =>
            if (!regex.findFirstIn(term).isEmpty) termsUpdated = 
              termsUpdated.updated(idx, normalizationMap(regex))
          case (set: Set[String]) =>
            if (set.contains(term)) termsUpdated = 
              termsUpdated.updated(idx, normalizationMap(set))
        }
      }
    }
    termsUpdated
  }
}

我这样使用它:

val testMap: Map[Any, String] = Map("hadoop".r -> "elephant",
  "spark".r -> "sparky", "cool".r -> "neat", 
  Set("123", "456") -> "set1",
  Set("789", "10") -> "set2")

val testTermNormalizer = new TermNormalizer(testMap)
val termNormalizerUdf = udf(testTermNormalizer.normalizeTerms(_: Seq[String]))

val trainingTest = sqlContext.createDataFrame(Seq(
  (0L, "spark is cool 123", 1.0),
  (1L, "adsjkfadfk akjdsfhad 456", 0.0),
  (2L, "spark rocks my socks 789 10", 1.0),
  (3L, "hadoop is cool 10", 0.0)
)).toDF("id", "text", "label")

val testTokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")

val tokenizedTrainingTest = testTokenizer.transform(trainingTest)
println(tokenizedTrainingTest
  .select($"id", $"text", $"words", termNormalizerUdf($"words"), $"label").show(false))

现在我更仔细地阅读了这个问题,听起来你在问如何避免这样做,哈哈。无论如何,我仍然会发布它,以防将来有人正在寻找一种简单的方法来应用类似转换器的功能

于 2016-03-28T17:46:21.730 回答
0

如果您也希望使转换器可写,那么您可以在您选择的公共包中的 sharedParams 库中重新实现 HasInputCol 等特征,然后将它们与 DefaultParamsWritable 特征一起使用以使转换器具有持久性。

这样,您还可以避免将部分代码放在 spark core ml 包中,但您需要在自己的包中维护一组并行参数。这不是一个真正的问题,因为它们几乎不会改变。

但是请在他们的 JIRA 板上跟踪错误,该错误要求将一些常见的 sharedParams 公开而不是私有给 ml,以便人们可以直接使用来自外部课程的那些。

于 2017-03-27T12:12:25.877 回答