我正在尝试使用Maxmind 扫雪机库来提取数据框中每个 IP 上的地理数据。
我们正在使用 Spark SQL(spark 版本 2.1.0),我在以下类中创建了一个 UDF:
class UdfDefinitions @Inject() extends Serializable with StrictLogging {
sparkSession.sparkContext.addFile("s3n://s3-maxmind-db/latest/GeoIPCity.dat")
val s3Config = configuration.databases.dataWarehouse.s3
val lruCacheConst = 20000
val ipLookups = IpLookups(geoFile = Some(SparkFiles.get(s3Config.geoIPFileName) ),
ispFile = None, orgFile = None, domainFile = None, memCache = false, lruCache = lruCacheConst)
def lookupIP(ip: String): LookupIPResult = {
val loc: Option[IpLocation] = ipLookups.getFile.performLookups(ip)._1
loc match {
case None => LookupIPResult("", "", "")
case Some(x) => LookupIPResult(Option(x.countryName).getOrElse(""),
x.city.getOrElse(""), x.regionName.getOrElse(""))
}
}
val lookupIPUDF: UserDefinedFunction = udf(lookupIP _)
}
目的是在 UDF 外部创建指向文件 (ipLookups) 的指针并在内部使用它,因此不要在每一行上打开文件。这会出现任务未序列化的错误,并且当我们在 UDF 中使用 addFiles 时,会出现打开文件过多的错误(使用大型数据集时,在小型数据集上它确实有效)。
这个线程展示了如何使用 RDD 来解决问题,但我们想使用 Spark SQL。在 spark 序列化中使用 maxmind geoip
有什么想法吗?谢谢