1

我在 spark 中有一个 UDF(在 EMR 上运行),用 scala 编写,使用 uaparser 库用于 scala(uap-scala)从用户代理解析设备。在小型集合上工作时它工作正常(5000 行),但在大型集合(2M)上运行时它工作得非常慢。我尝试收集 Dataframe 以列出并在驱动程序上循环它,这也很慢,是什么让我相信 UDF 在驱动程序而不是工人上运行

  1. 我怎样才能确定这一点?有人有其他理论吗?
  2. 如果是这样,为什么会发生这种情况?

这是 udf 代码:

def calcDevice(userAgent: String): String = {

val userAgentVal = Option(userAgent).getOrElse("")
Parser.get.parse(userAgentVal).device.family
}

val calcDeviceValUDF: UserDefinedFunction = udf(calcDevice _)

用法:

.withColumn("agentDevice", udfDefinitions.calcDeviceValUDF($"userAgent"))

谢谢尼尔

4

3 回答 3

4

问题在于在 UDF itelf 中实例化构建器。解决方案是在 udf 之外创建对象并在行级别使用它:

val userAgentAnalyzerUAParser = Parser.get

def calcDevice(userAgent: String): String = {

val userAgentVal = Option(userAgent).getOrElse("")
userAgentAnalyzerUAParser.parse(userAgentVal).device.family
}

val calcDeviceValUDF: UserDefinedFunction = udf(calcDevice _)
于 2017-11-13T17:02:03.777 回答
1

我们遇到了 Spark 作业挂起的相同问题。我们做的另一件事是使用广播变量。在所有更改之后,此 UDF 实际上非常慢,因此您的里程可能会有所不同。另一个警告是获取SparkSession;我们在 Databricks 中运行,如果SparkSession不可用,那么它将崩溃;如果您需要继续工作,那么您必须处理该失败案例。

object UDFs extends Serializable {
  val uaParser = SparkSession.getActiveSession.map(_.sparkContext.broadcast(CachingParser.default(100000)))

  val parseUserAgent = udf { (userAgent: String) =>
    // We will simply return an empty map if uaParser is None because that would mean
    // there is no active spark session to broadcast the parser.
    //
    // Also if you wrap the potentially null value in an Option and use flatMap and map to
    // add type safety it becomes slower.
    if (userAgent == null || uaParser.isEmpty) {
      Map[String, Map[String, String]]()
    } else {
      val parsed = uaParser.get.value.parse(userAgent)
      Map(
        "browser" -> Map(
          "family"      -> parsed.userAgent.family,
          "major"       -> parsed.userAgent.major.getOrElse(""),
          "minor"       -> parsed.userAgent.minor.getOrElse(""),
          "patch"       -> parsed.userAgent.patch.getOrElse("")
        ),
        "os" -> Map(
          "family"      -> parsed.os.family,
          "major"       -> parsed.os.major.getOrElse(""),
          "minor"       -> parsed.os.minor.getOrElse(""),
          "patch"       -> parsed.os.patch.getOrElse(""),
          "patch-minor" -> parsed.os.patchMinor.getOrElse("")
        ),
        "device" -> Map(
          "family"      -> parsed.device.family,
          "brand"       -> parsed.device.brand.getOrElse(""),
          "model"       -> parsed.device.model.getOrElse("")
        )
      )
    }
  }    
}

您可能还想使用CachingParser的大小。

于 2018-09-18T14:09:54.900 回答
0

鉴于Parser.get.parse问题中缺少,可以仅判断udf部分。

为了性能,您可以删除Option

def calcDevice(userAgent: String): String = {
  val userAgentVal = if(userAgent == null) "" else userAgent
  Parser.get.parse(userAgentVal).device.family
}
于 2017-11-13T16:18:55.673 回答