我们遇到了 Spark 作业挂起的相同问题。我们做的另一件事是使用广播变量。在所有更改之后,此 UDF 实际上非常慢,因此您的里程可能会有所不同。另一个警告是获取SparkSession;我们在 Databricks 中运行,如果SparkSession不可用,那么它将崩溃;如果您需要继续工作,那么您必须处理该失败案例。
object UDFs extends Serializable {
val uaParser = SparkSession.getActiveSession.map(_.sparkContext.broadcast(CachingParser.default(100000)))
val parseUserAgent = udf { (userAgent: String) =>
// We will simply return an empty map if uaParser is None because that would mean
// there is no active spark session to broadcast the parser.
//
// Also if you wrap the potentially null value in an Option and use flatMap and map to
// add type safety it becomes slower.
if (userAgent == null || uaParser.isEmpty) {
Map[String, Map[String, String]]()
} else {
val parsed = uaParser.get.value.parse(userAgent)
Map(
"browser" -> Map(
"family" -> parsed.userAgent.family,
"major" -> parsed.userAgent.major.getOrElse(""),
"minor" -> parsed.userAgent.minor.getOrElse(""),
"patch" -> parsed.userAgent.patch.getOrElse("")
),
"os" -> Map(
"family" -> parsed.os.family,
"major" -> parsed.os.major.getOrElse(""),
"minor" -> parsed.os.minor.getOrElse(""),
"patch" -> parsed.os.patch.getOrElse(""),
"patch-minor" -> parsed.os.patchMinor.getOrElse("")
),
"device" -> Map(
"family" -> parsed.device.family,
"brand" -> parsed.device.brand.getOrElse(""),
"model" -> parsed.device.model.getOrElse("")
)
)
}
}
}
您可能还想使用CachingParser的大小。